Skip to content

Commit

Permalink
fix qsort
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jul 26, 2024
1 parent b7f60f6 commit 784cb36
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
8 changes: 7 additions & 1 deletion cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
begin = RadixSort<Element>::sort(
arrayPtr_, arraySize_, numRecords, kPartitionIdStartByteIndex, kPartitionIdEndByteIndex);
} else {
qsort(arrayPtr_, arraySize_, sizeof(Element), Element::compare);
auto ptr = arrayPtr_;
qsort(ptr, numRecords, sizeof(Element), compare);
}
}

Expand Down Expand Up @@ -369,4 +370,9 @@ int64_t VeloxSortShuffleWriter::totalSortTime() const {
int64_t VeloxSortShuffleWriter::totalC2RTime() const {
return c2rTime_;
}

int VeloxSortShuffleWriter::compare(const void* a, const void* b) {
// No same values.
return ((Element*)a)->value > ((Element*)b)->value ? 1 : -1;
}
} // namespace gluten
6 changes: 2 additions & 4 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,10 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
struct Element {
uint64_t value;
uint32_t rowSize;

static int compare(const void* a, const void* b) {
return ((Element*)a)->value - ((Element*)b)->value;
}
};

static int compare(const void* a, const void* b);

// Stores compact row id -> row
facebook::velox::BufferPtr array_;
Element* arrayPtr_;
Expand Down
11 changes: 7 additions & 4 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};

for (const auto& compression : compressions) {
for (auto useRadixSort : {true, false}) {
params.push_back(ShuffleTestParams{
ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort});
}
params.push_back(
ShuffleTestParams{ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0});
params.push_back(
ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0});
ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0, false});
for (const auto compressionThreshold : compressionThresholds) {
for (const auto mergeBufferSize : mergeBufferSizes) {
params.push_back(ShuffleTestParams{
ShuffleWriterType::kHashShuffle,
PartitionWriterType::kLocal,
compression,
compressionThreshold,
mergeBufferSize});
mergeBufferSize,
false /* unused */});
}
params.push_back(ShuffleTestParams{
ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0});
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ struct ShuffleTestParams {
arrow::Compression::type compressionType;
int32_t compressionThreshold;
int32_t mergeBufferSize;
bool useRadixSort;

std::string toString() const {
std::ostringstream out;
out << "shuffleWriterType = " << shuffleWriterType << ", partitionWriterType = " << partitionWriterType
<< ", compressionType = " << compressionType << ", compressionThreshold = " << compressionThreshold
<< ", mergeBufferSize = " << mergeBufferSize;
<< ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " << (useRadixSort ? "true" : "false");
return out.str();
}
};
Expand Down Expand Up @@ -250,6 +251,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams
RETURN_NOT_OK(VeloxShuffleWriterTestBase::initShuffleWriterOptions());

ShuffleTestParams params = GetParam();
shuffleWriterOptions_.useRadixSort = params.useRadixSort;
partitionWriterOptions_.compressionType = params.compressionType;
switch (partitionWriterOptions_.compressionType) {
case arrow::Compression::UNCOMPRESSED:
Expand Down

0 comments on commit 784cb36

Please sign in to comment.