Skip to content

Commit

Permalink
partial revert
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 24, 2024
1 parent bf5ad1e commit d892f09
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 32 deletions.
6 changes: 3 additions & 3 deletions cpp/velox/shuffle/RadixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class RadixSort {
auto offsets = transformCountsToOffsets(counts, outIndex);

for (auto offset = inIndex; offset < inIndex + numRecords; ++offset) {
auto bucket = (array[offset] >> (byteIdx * 8)) & 0xff;
auto bucket = (array[offset].first >> (byteIdx * 8)) & 0xff;
array[offsets[bucket]++] = array[offset];
}
}
Expand All @@ -112,7 +112,7 @@ class RadixSort {
int64_t bitwiseMax = 0;
int64_t bitwiseMin = -1L;
for (auto offset = 0; offset < numRecords; ++offset) {
auto value = array[offset];
auto value = array[offset].first;
bitwiseMax |= value;
bitwiseMin &= value;
}
Expand All @@ -123,7 +123,7 @@ class RadixSort {
if (((bitsChanged >> (i * 8)) & 0xff) != 0) {
counts[i].resize(256);
for (auto offset = 0; offset < numRecords; ++offset) {
counts[i][(array[offset] >> (i * 8)) & 0xff]++;
counts[i][(array[offset].first >> (i * 8)) & 0xff]++;
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,9 @@ std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::deserializeTo
auto buffer = cur->second;
const auto* rawBuffer = buffer->as<char>();
while (rowOffset_ < cur->first && readRows < batchSize_) {
auto rowSize = *(RowSizeType*)(rawBuffer + byteOffset_);
data.push_back(std::string_view(rawBuffer + byteOffset_ + sizeof(RowSizeType), rowSize - sizeof(RowSizeType)));
auto rowSize = *(uint32_t*)(rawBuffer + byteOffset_);
byteOffset_ += sizeof(uint32_t);
data.push_back(std::string_view(rawBuffer + byteOffset_, rowSize));
byteOffset_ += rowSize;
++rowOffset_;
++readRows;
Expand Down
38 changes: 15 additions & 23 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,13 @@ arrow::Status VeloxSortShuffleWriter::init() {
options_.partitioning == Partitioning::kSingle,
arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
array_.resize(initialSize_);
partitionRawSize_.resize(numPartitions_, 0);
return arrow::Status::OK();
}

void VeloxSortShuffleWriter::initRowType(const facebook::velox::RowVectorPtr& rv) {
if (UNLIKELY(!rowType_)) {
rowType_ = facebook::velox::asRowType(rv->type());
fixedRowSize_ = facebook::velox::row::CompactRow::fixedRowSize(rowType_);
if (fixedRowSize_) {
*fixedRowSize_ += sizeof(RowSizeType);
}
}
}

Expand Down Expand Up @@ -157,7 +153,7 @@ arrow::Status VeloxSortShuffleWriter::insert(const facebook::velox::RowVectorPtr
rowSizes_.resize(inputRows + 1);
rowSizes_[0] = 0;
for (auto i = 0; i < inputRows; ++i) {
rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i) + sizeof(RowSizeType);
rowSizes_[i + 1] = rowSizes_[i] + row.rowSize(i);
}
}

Expand All @@ -183,15 +179,8 @@ void VeloxSortShuffleWriter::insertRows(facebook::velox::row::CompactRow& row, u
// Allocate newArray can trigger spill.
growArrayIfNecessary(rows);
for (auto i = offset; i < offset + rows; ++i) {
auto pid = row2Partition_[i];
array_[offset_++] = toCompactRowId(pid, pageNumber_, pageCursor_);

// (RowSizeType)size | serialized row
auto dest = currentPage_ + pageCursor_;
RowSizeType size = sizeof(RowSizeType) + row.serialize(i, dest + sizeof(RowSizeType));
memcpy(dest, &size, sizeof(RowSizeType));

partitionRawSize_[pid] += size;
auto size = row.serialize(i, currentPage_ + pageCursor_);
array_[offset_++] = {toCompactRowId(row2Partition_[i], pageNumber_, pageCursor_), size};
pageCursor_ += size;
}
}
Expand All @@ -209,7 +198,6 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
int32_t begin = 0;
{
ScopedTimer timer(&sortTime_);
// TODO: Add radix sort to align with Spark.
if (useRadixSort_) {
begin = RadixSort<SortArray>::sort(array_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex);
} else {
Expand All @@ -219,9 +207,9 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {

auto end = begin + numRecords;
auto cur = begin;
auto pid = extractPartitionId(array_[begin]);
auto pid = extractPartitionId(array_[begin].first);
while (++cur < end) {
auto curPid = extractPartitionId(array_[cur]);
auto curPid = extractPartitionId(array_[cur].first);
if (curPid != pid) {
RETURN_NOT_OK(evictPartition(pid, begin, cur));
pid = curPid;
Expand Down Expand Up @@ -249,8 +237,10 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
// Serialize [begin, end)
uint32_t numRows = end - begin;
uint64_t rawSize = partitionRawSize_[partitionId];
partitionRawSize_[partitionId] = 0;
uint64_t rawSize = numRows * sizeof(RowSizeType);
for (auto i = begin; i < end; ++i) {
rawSize += array_[i].second;
}

if (sortedBuffer_ == nullptr || sortedBuffer_->size() < rawSize) {
sortedBuffer_ = nullptr;
Expand All @@ -260,10 +250,12 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_

uint64_t offset = 0;
for (auto i = begin; i < end; ++i) {
auto index = extractPageNumberAndOffset(array_[i]);
const auto* src = pageAddresses_[index.first] + index.second;
auto size = *(RowSizeType*)(src);
memcpy(rawBuffer + offset, src, size);
// size(size_t) | bytes
auto size = array_[i].second;
memcpy(rawBuffer + offset, &size, sizeof(RowSizeType));
offset += sizeof(RowSizeType);
auto index = extractPageNumberAndOffset(array_[i].first);
memcpy(rawBuffer + offset, pageAddresses_[index.first] + index.second, size);
offset += size;
}
VELOX_CHECK_EQ(offset, rawSize);
Expand Down
7 changes: 3 additions & 4 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {

void growArrayIfNecessary(uint32_t rows);

using Allocator = facebook::velox::StlAllocator<uint64_t>;
using SortArray = std::vector<uint64_t, Allocator>;
using ElementType = std::pair<uint64_t, RowSizeType>;
using Allocator = facebook::velox::StlAllocator<ElementType>;
using SortArray = std::vector<ElementType, Allocator>;

std::unique_ptr<facebook::velox::HashStringAllocator> allocator_;
// Stores compact row id -> row
Expand All @@ -109,8 +110,6 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
// Updated for each input RowVector.
std::vector<uint32_t> row2Partition_;

std::vector<uint64_t> partitionRawSize_;

std::shared_ptr<const facebook::velox::RowType> rowType_;
std::optional<int32_t> fixedRowSize_;
std::vector<uint64_t> rowSizes_;
Expand Down

0 comments on commit d892f09

Please sign in to comment.