diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index dc9ce3435c380..c673a678d98c8 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -192,6 +192,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/Partitioning.cc shuffle/Payload.cc shuffle/rss/RssPartitionWriter.cc + shuffle/RandomPartitioner.cc shuffle/RoundRobinPartitioner.cc shuffle/ShuffleMemoryPool.cc shuffle/ShuffleReader.cc diff --git a/cpp/core/shuffle/Partitioner.cc b/cpp/core/shuffle/Partitioner.cc index 80b4598a1f170..fb1a5aab44ebd 100644 --- a/cpp/core/shuffle/Partitioner.cc +++ b/cpp/core/shuffle/Partitioner.cc @@ -18,6 +18,7 @@ #include "shuffle/Partitioner.h" #include "shuffle/FallbackRangePartitioner.h" #include "shuffle/HashPartitioner.h" +#include "shuffle/RandomPartitioner.h" #include "shuffle/RoundRobinPartitioner.h" #include "shuffle/SinglePartitioner.h" @@ -34,6 +35,8 @@ Partitioner::make(Partitioning partitioning, int32_t numPartitions, int32_t star return std::make_shared(); case Partitioning::kRange: return std::make_shared(numPartitions); + case Partitioning::kRandom: + return std::make_shared(numPartitions); default: return arrow::Status::Invalid("Unsupported partitioning type: " + std::to_string(partitioning)); } diff --git a/cpp/core/shuffle/Partitioning.cc b/cpp/core/shuffle/Partitioning.cc index dfe848d630466..84fe6ecd972f9 100644 --- a/cpp/core/shuffle/Partitioning.cc +++ b/cpp/core/shuffle/Partitioning.cc @@ -23,6 +23,7 @@ static const std::string kSinglePartitioningName = "single"; static const std::string kRoundRobinPartitioningName = "rr"; static const std::string kHashPartitioningName = "hash"; static const std::string kRangePartitioningName = "range"; +static const std::string kRandomPartitioningName = "random"; } // namespace namespace gluten { @@ -39,6 +40,9 @@ Partitioning toPartitioning(std::string name) { if (name == kRangePartitioningName) { return Partitioning::kRange; } + if (name == kRandomPartitioningName) { + return Partitioning::kRandom; + } throw GlutenException("Invalid partition name: " + name); } diff --git a/cpp/core/shuffle/Partitioning.h b/cpp/core/shuffle/Partitioning.h index 1d65e9d6b9933..a60d43561bee1 100644 --- a/cpp/core/shuffle/Partitioning.h +++ b/cpp/core/shuffle/Partitioning.h @@ -20,7 +20,7 @@ #include namespace gluten { -enum Partitioning { kSingle, kRoundRobin, kHash, kRange }; +enum Partitioning { kSingle, kRoundRobin, kHash, kRange, kRandom /*for test only*/ }; Partitioning toPartitioning(std::string name); diff --git a/cpp/core/shuffle/RandomPartitioner.cc b/cpp/core/shuffle/RandomPartitioner.cc new file mode 100644 index 0000000000000..06d87be40f7f1 --- /dev/null +++ b/cpp/core/shuffle/RandomPartitioner.cc @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/RandomPartitioner.h" + +namespace gluten { + +arrow::Status gluten::RandomPartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + std::vector& row2Partition, + std::vector& partition2RowCount) { + std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0); + row2Partition.resize(numRows); + + for (int32_t i = 0; i < numRows; ++i) { + row2Partition[i] = dist_(rng_); + } + + for (auto& pid : row2Partition) { + partition2RowCount[pid]++; + } + + return arrow::Status::OK(); +} + +arrow::Status gluten::RandomPartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map>& rowVectorIndexMap) { + auto index = static_cast(vectorIndex) << 32; + for (int32_t i = 0; i < numRows; ++i) { + int64_t combined = index | (i & 0xFFFFFFFFLL); + auto& vec = rowVectorIndexMap[dist_(rng_)]; + vec.push_back(combined); + } + + return arrow::Status::OK(); +} + +} // namespace gluten diff --git a/cpp/core/shuffle/RandomPartitioner.h b/cpp/core/shuffle/RandomPartitioner.h new file mode 100644 index 0000000000000..77d00716943cd --- /dev/null +++ b/cpp/core/shuffle/RandomPartitioner.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "shuffle/Partitioner.h" + +namespace gluten { +class RandomPartitioner final : public Partitioner { + public: + RandomPartitioner(int32_t numPartitions) : Partitioner(numPartitions, false) { + std::random_device dev; + rng_.seed(dev()); + dist_ = std::uniform_int_distribution(0, numPartitions - 1); + } + + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + std::vector& row2Partition, + std::vector& partition2RowCount) override; + + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map>& rowVectorIndexMap) override; + + private: + std::mt19937 rng_; + std::uniform_int_distribution dist_; +}; +} // namespace gluten diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 3402675a41b6e..6403d56e62ebd 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -47,7 +47,10 @@ namespace { DEFINE_bool(print_result, true, "Print result for execution"); DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator"); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); -DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single"); +DEFINE_string( + partitioning, + "rr", + "Short partitioning name. Valid options are rr, hash, range, single, random (only for test purpose)"); DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort"); DEFINE_bool(rss, false, "Mocking rss."); DEFINE_string( @@ -140,11 +143,7 @@ std::shared_ptr createShuffleWriter( } auto options = ShuffleWriterOptions{}; - if (FLAGS_run_shuffle) { - options.partitioning = Partitioning::kRoundRobin; - } else { - options.partitioning = gluten::toPartitioning(FLAGS_partitioning); - } + options.partitioning = gluten::toPartitioning(FLAGS_partitioning); if (FLAGS_shuffle_writer == "sort") { options.shuffleWriterType = gluten::kSortShuffle; } @@ -439,6 +438,8 @@ int main(int argc, char** argv) { std::string errorMsg{}; if (FLAGS_data.empty()) { errorMsg = "Missing '--split' or '--data' option."; + } else if (FLAGS_partitioning != "rr" && FLAGS_partitioning != "random") { + errorMsg = "--run-shuffle only support round-robin partitioning and random partitioning."; } if (errorMsg.empty()) { try { diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index 956d4995f3037..21f222b42690d 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -324,8 +324,9 @@ cd /path/to/gluten/cpp/build/velox/benchmarks Developers can only run shuffle write task via specifying `--run-shuffle` and `--data` options. The parquet format input will be read from arrow-parquet reader and sent to shuffle writer. -This option is similar to the `--with-shuffle` option, but it doesn't require the plan and split files, -and doesn't support the `--partitioning` option. The round-robin partitioner is used by default. +This option is similar to the `--with-shuffle` option, but it doesn't require the plan and split files. +The round-robin partitioner is used by default. Besides, random partitioning can be used for testing purpose. +By specifying option `--partitioning random`, the partitioner will generate a random partition id for each row. ```shell cd /path/to/gluten/cpp/build/velox/benchmarks