Skip to content

Commit

Permalink
[VL] Enable sort-based shuffle in micro benchmark (apache#5942)
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma authored Jun 17, 2024
1 parent e2f90e9 commit e4388e6
Show file tree
Hide file tree
Showing 22 changed files with 592 additions and 603 deletions.
1 change: 1 addition & 0 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
shuffle/Partitioning.cc
shuffle/Payload.cc
shuffle/rss/RssPartitionWriter.cc
shuffle/RandomPartitioner.cc
shuffle/RoundRobinPartitioner.cc
shuffle/ShuffleMemoryPool.cc
shuffle/ShuffleReader.cc
Expand Down
18 changes: 10 additions & 8 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -756,14 +756,23 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
throw gluten::GlutenException(std::string("Short partitioning name can't be null"));
}

// Build ShuffleWriterOptions.
auto shuffleWriterOptions = ShuffleWriterOptions{
.bufferSize = bufferSize,
.bufferReallocThreshold = reallocThreshold,
.partitioning = gluten::toPartitioning(jStringToCString(env, partitioningNameJstr)),
.taskAttemptId = (int64_t)taskAttemptId,
.startPartitionId = startPartitionId,
};
auto shuffleWriterTypeC = env->GetStringUTFChars(shuffleWriterTypeJstr, JNI_FALSE);
auto shuffleWriterType = std::string(shuffleWriterTypeC);
env->ReleaseStringUTFChars(shuffleWriterTypeJstr, shuffleWriterTypeC);

if (shuffleWriterType == "sort") {
shuffleWriterOptions.shuffleWriterType = kSortShuffle;
}

// Build PartitionWriterOptions.
auto partitionWriterOptions = PartitionWriterOptions{
.mergeBufferSize = mergeBufferSize,
.mergeThreshold = mergeThreshold,
Expand All @@ -779,20 +788,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
partitionWriterOptions.codecBackend = getCodecBackend(env, codecBackendJstr);
partitionWriterOptions.compressionMode = getCompressionMode(env, compressionModeJstr);
}

std::unique_ptr<PartitionWriter> partitionWriter;

auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, JNI_FALSE);
auto partitionWriterType = std::string(partitionWriterTypeC);
env->ReleaseStringUTFChars(partitionWriterTypeJstr, partitionWriterTypeC);

auto shuffleWriterTypeC = env->GetStringUTFChars(shuffleWriterTypeJstr, JNI_FALSE);
auto shuffleWriterType = std::string(shuffleWriterTypeC);
env->ReleaseStringUTFChars(shuffleWriterTypeJstr, shuffleWriterTypeC);

if (shuffleWriterType == "sort") {
shuffleWriterOptions.shuffleWriterType = kSortShuffle;
}

if (partitionWriterType == "local") {
if (dataFileJstr == NULL) {
throw gluten::GlutenException(std::string("Shuffle DataFile can't be null"));
Expand Down
3 changes: 1 addition & 2 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "shuffle/Payload.h"
#include "shuffle/Spill.h"
#include "shuffle/Utils.h"
#include "utils/Timer.h"

namespace gluten {

Expand Down Expand Up @@ -547,7 +546,7 @@ arrow::Status LocalPartitionWriter::evict(
arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) {
rawPartitionLengths_[partitionId] += rawSize;

if (partitionId <= lastEvictPid_) {
if (partitionId < lastEvictPid_) {
RETURN_NOT_OK(finishSpill());
}
lastEvictPid_ = partitionId;
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/shuffle/Partitioner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "shuffle/Partitioner.h"
#include "shuffle/FallbackRangePartitioner.h"
#include "shuffle/HashPartitioner.h"
#include "shuffle/RandomPartitioner.h"
#include "shuffle/RoundRobinPartitioner.h"
#include "shuffle/SinglePartitioner.h"

Expand All @@ -34,6 +35,8 @@ Partitioner::make(Partitioning partitioning, int32_t numPartitions, int32_t star
return std::make_shared<SinglePartitioner>();
case Partitioning::kRange:
return std::make_shared<FallbackRangePartitioner>(numPartitions);
case Partitioning::kRandom:
return std::make_shared<RandomPartitioner>(numPartitions);
default:
return arrow::Status::Invalid("Unsupported partitioning type: " + std::to_string(partitioning));
}
Expand Down
4 changes: 4 additions & 0 deletions cpp/core/shuffle/Partitioning.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ static const std::string kSinglePartitioningName = "single";
static const std::string kRoundRobinPartitioningName = "rr";
static const std::string kHashPartitioningName = "hash";
static const std::string kRangePartitioningName = "range";
static const std::string kRandomPartitioningName = "random";
} // namespace

namespace gluten {
Expand All @@ -39,6 +40,9 @@ Partitioning toPartitioning(std::string name) {
if (name == kRangePartitioningName) {
return Partitioning::kRange;
}
if (name == kRandomPartitioningName) {
return Partitioning::kRandom;
}
throw GlutenException("Invalid partition name: " + name);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Partitioning.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <string>

namespace gluten {
enum Partitioning { kSingle, kRoundRobin, kHash, kRange };
enum Partitioning { kSingle, kRoundRobin, kHash, kRange, kRandom /*for test only*/ };

Partitioning toPartitioning(std::string name);

Expand Down
2 changes: 2 additions & 0 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ arrow::Status UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
}

arrow::Result<std::shared_ptr<arrow::Buffer>> UncompressedDiskBlockPayload::readUncompressedBuffer() {
ScopedTimer timer(&writeTime_);
readPos_++;
int64_t bufferLength;
RETURN_NOT_OK(inputStream_->Read(sizeof(int64_t), &bufferLength));
Expand All @@ -525,6 +526,7 @@ CompressedDiskBlockPayload::CompressedDiskBlockPayload(
: Payload(Type::kCompressed, numRows, isValidityBuffer), inputStream_(inputStream), rawSize_(rawSize) {}

arrow::Status CompressedDiskBlockPayload::serialize(arrow::io::OutputStream* outputStream) {
ScopedTimer timer(&writeTime_);
ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_));
RETURN_NOT_OK(outputStream->Write(block));
return arrow::Status::OK();
Expand Down
56 changes: 56 additions & 0 deletions cpp/core/shuffle/RandomPartitioner.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "shuffle/RandomPartitioner.h"

namespace gluten {

arrow::Status gluten::RandomPartitioner::compute(
const int32_t* pidArr,
const int64_t numRows,
std::vector<uint32_t>& row2Partition,
std::vector<uint32_t>& partition2RowCount) {
std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0);
row2Partition.resize(numRows);

for (int32_t i = 0; i < numRows; ++i) {
row2Partition[i] = dist_(rng_);
}

for (auto& pid : row2Partition) {
partition2RowCount[pid]++;
}

return arrow::Status::OK();
}

arrow::Status gluten::RandomPartitioner::compute(
const int32_t* pidArr,
const int64_t numRows,
const int32_t vectorIndex,
std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) {
auto index = static_cast<int64_t>(vectorIndex) << 32;
for (int32_t i = 0; i < numRows; ++i) {
int64_t combined = index | (i & 0xFFFFFFFFLL);
auto& vec = rowVectorIndexMap[dist_(rng_)];
vec.push_back(combined);
}

return arrow::Status::OK();
}

} // namespace gluten
48 changes: 48 additions & 0 deletions cpp/core/shuffle/RandomPartitioner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <random>
#include "shuffle/Partitioner.h"

namespace gluten {
class RandomPartitioner final : public Partitioner {
public:
RandomPartitioner(int32_t numPartitions) : Partitioner(numPartitions, false) {
std::random_device dev;
rng_.seed(dev());
dist_ = std::uniform_int_distribution<std::mt19937::result_type>(0, numPartitions - 1);
}

arrow::Status compute(
const int32_t* pidArr,
const int64_t numRows,
std::vector<uint32_t>& row2Partition,
std::vector<uint32_t>& partition2RowCount) override;

arrow::Status compute(
const int32_t* pidArr,
const int64_t numRows,
const int32_t vectorIndex,
std::unordered_map<int32_t, std::vector<int64_t>>& rowVectorIndexMap) override;

private:
std::mt19937 rng_;
std::uniform_int_distribution<std::mt19937::result_type> dist_;
};
} // namespace gluten
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ set(VELOX_SRCS
operators/serializer/VeloxRowToColumnarConverter.cc
operators/writer/VeloxParquetDatasource.cc
shuffle/VeloxShuffleReader.cc
shuffle/VeloxShuffleWriter.cc
shuffle/VeloxHashBasedShuffleWriter.cc
shuffle/VeloxSortBasedShuffleWriter.cc
substrait/SubstraitParser.cc
Expand Down
2 changes: 0 additions & 2 deletions cpp/velox/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)

add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)

add_velox_benchmark(shuffle_split_benchmark ShuffleSplitBenchmark.cc)

if(ENABLE_ORC)
add_velox_benchmark(orc_converter exec/OrcConverter.cc)
endif()
Loading

0 comments on commit e4388e6

Please sign in to comment.