diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index fc5d758f8c8b..031be791bdf0 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -609,11 +609,23 @@ arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr< if (lastEvictPid_ != -1 && partitionId < lastEvictPid_) { RETURN_NOT_OK(finishSpill(true)); + lastEvictPid_ = -1; } - lastEvictPid_ = partitionId; - RETURN_NOT_OK(requestSpill(stop)); - RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); + + if (!stop) { + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); + } else { + if (spills_.size() > 0) { + for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) { + auto bytesEvicted = totalBytesEvicted_; + RETURN_NOT_OK(mergeSpills(pid)); + partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted; + } + } + RETURN_NOT_OK(spiller_->spill(partitionId, std::move(blockPayload))); + } + lastEvictPid_ = partitionId; return arrow::Status::OK(); } diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index daeef24ce5a6..b8d8274cb782 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -293,7 +293,6 @@ arrow::Result> BlockPayload::readBufferAt(uint32_ arrow::Result>> BlockPayload::deserialize( arrow::io::InputStream* inputStream, - const std::shared_ptr& schema, const std::shared_ptr& codec, arrow::MemoryPool* pool, uint32_t& numRows, diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index 4c53065a6ed9..0a317d9c3af9 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -88,7 +88,6 @@ class BlockPayload final : public Payload { static arrow::Result>> deserialize( arrow::io::InputStream* inputStream, - const std::shared_ptr& schema, const std::shared_ptr& codec, arrow::MemoryPool* pool, uint32_t& numRows, diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index a7776f9f1074..4b46dbd59941 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -30,6 +30,7 @@ #include "compute/VeloxPlanConverter.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" +#include "config/VeloxConfig.h" #include "shuffle/LocalPartitionWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "shuffle/rss/RssPartitionWriter.h" @@ -44,22 +45,23 @@ using namespace gluten; namespace { +DEFINE_bool(run_example, false, "Run the example and exit."); DEFINE_bool(print_result, true, "Print result for execution"); DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator"); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); +DEFINE_bool(run_shuffle, false, "Only run shuffle write."); +DEFINE_bool(run_shuffle_read, false, "Whether to run shuffle read when run_shuffle is true."); +DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort"); DEFINE_string( partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single, random (only for test purpose)"); -DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort"); DEFINE_bool(rss, false, "Mocking rss."); DEFINE_string( compression, "lz4", "Specify the compression codec. Valid options are lz4, zstd, qat_gzip, qat_zstd, iaa_gzip"); DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions"); -DEFINE_bool(run_shuffle, false, "Only run shuffle write."); -DEFINE_bool(run_example, false, "Run the example and exit."); DEFINE_string(plan, "", "Path to input json file of the substrait plan."); DEFINE_string( @@ -76,15 +78,21 @@ DEFINE_string( "Scan mode for reading parquet data." "'stream' mode: Input file scan happens inside of the pipeline." "'buffered' mode: First read all data into memory and feed the pipeline with it."); +DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting `spark.gluten.sql.debug`"); struct WriterMetrics { - int64_t splitTime; - int64_t evictTime; - int64_t writeTime; - int64_t compressTime; + int64_t splitTime{0}; + int64_t evictTime{0}; + int64_t writeTime{0}; + int64_t compressTime{0}; + + int64_t bytesSpilled{0}; + int64_t bytesWritten{0}; +}; - public: - explicit WriterMetrics() : splitTime(0), evictTime(0), writeTime(0), compressTime(0) {} +struct ReaderMetrics { + int64_t decompressTime{0}; + int64_t deserializeTime{0}; }; void setUpBenchmark(::benchmark::internal::Benchmark* bm) { @@ -98,9 +106,10 @@ void setUpBenchmark(::benchmark::internal::Benchmark* bm) { } } -std::shared_ptr -createShuffleWriter(Runtime* runtime, const std::string& dataFile, const std::vector& localDirs) { +PartitionWriterOptions createPartitionWriterOptions() { PartitionWriterOptions partitionWriterOptions{}; + // Disable writer's merge. + partitionWriterOptions.mergeThreshold = 0; // Configure compression. if (FLAGS_compression == "lz4") { @@ -121,27 +130,39 @@ createShuffleWriter(Runtime* runtime, const std::string& dataFile, const std::ve partitionWriterOptions.codecBackend = CodecBackend::IAA; partitionWriterOptions.compressionType = arrow::Compression::GZIP; } + return partitionWriterOptions; +} +std::unique_ptr createPartitionWriter( + Runtime* runtime, + PartitionWriterOptions options, + const std::string& dataFile, + const std::vector& localDirs) { std::unique_ptr partitionWriter; if (FLAGS_rss) { auto rssClient = std::make_unique(dataFile); partitionWriter = std::make_unique( FLAGS_shuffle_partitions, - std::move(partitionWriterOptions), + std::move(options), runtime->memoryManager()->getArrowMemoryPool(), std::move(rssClient)); } else { partitionWriter = std::make_unique( FLAGS_shuffle_partitions, - std::move(partitionWriterOptions), + std::move(options), runtime->memoryManager()->getArrowMemoryPool(), dataFile, localDirs); } + return partitionWriter; +} +std::shared_ptr createShuffleWriter( + Runtime* runtime, + std::unique_ptr partitionWriter) { auto options = ShuffleWriterOptions{}; options.partitioning = gluten::toPartitioning(FLAGS_partitioning); - if (FLAGS_rss) { + if (FLAGS_rss || FLAGS_shuffle_writer == "rss_sort") { options.shuffleWriterType = gluten::kRssSortShuffle; } else if (FLAGS_shuffle_writer == "sort") { options.shuffleWriterType = gluten::kSortShuffle; @@ -163,6 +184,8 @@ void populateWriterMetrics( if (splitTime > 0) { metrics.splitTime += splitTime; } + metrics.bytesWritten += shuffleWriter->totalBytesWritten(); + metrics.bytesSpilled += shuffleWriter->totalBytesEvicted(); } void setCpu(::benchmark::State& state) { @@ -171,7 +194,7 @@ void setCpu(::benchmark::State& state) { if (FLAGS_cpu != -1) { cpu += FLAGS_cpu; } - LOG(INFO) << "Setting CPU for thread " << state.thread_index() << " to " << cpu; + LOG(WARNING) << "Setting CPU for thread " << state.thread_index() << " to " << cpu; gluten::setCpu(cpu); } @@ -179,26 +202,56 @@ void runShuffle( Runtime* runtime, BenchmarkAllocationListener* listener, const std::shared_ptr& resultIter, - WriterMetrics& metrics) { + WriterMetrics& writerMetrics, + ReaderMetrics& readerMetrics, + bool readAfterWrite) { std::string dataFile; std::vector localDirs; bool isFromEnv; GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv)); - auto shuffleWriter = createShuffleWriter(runtime, dataFile, localDirs); + auto partitionWriterOptions = createPartitionWriterOptions(); + auto partitionWriter = createPartitionWriter(runtime, partitionWriterOptions, dataFile, localDirs); + auto shuffleWriter = createShuffleWriter(runtime, std::move(partitionWriter)); listener->setShuffleWriter(shuffleWriter.get()); int64_t totalTime = 0; + std::shared_ptr cSchema; { gluten::ScopedTimer timer(&totalTime); while (resultIter->hasNext()) { - GLUTEN_THROW_NOT_OK( - shuffleWriter->write(resultIter->next(), ShuffleWriter::kMaxMemLimit - shuffleWriter->cachedPayloadSize())); + auto cb = resultIter->next(); + if (!cSchema) { + cSchema = cb->exportArrowSchema(); + } + GLUTEN_THROW_NOT_OK(shuffleWriter->write(cb, ShuffleWriter::kMaxMemLimit - shuffleWriter->cachedPayloadSize())); } GLUTEN_THROW_NOT_OK(shuffleWriter->stop()); } - populateWriterMetrics(shuffleWriter, totalTime, metrics); + populateWriterMetrics(shuffleWriter, totalTime, writerMetrics); + + if (readAfterWrite && cSchema) { + auto readerOptions = ShuffleReaderOptions{}; + readerOptions.shuffleWriterType = shuffleWriter->options().shuffleWriterType; + readerOptions.compressionType = partitionWriterOptions.compressionType; + readerOptions.codecBackend = partitionWriterOptions.codecBackend; + readerOptions.compressionTypeStr = partitionWriterOptions.compressionTypeStr; + + std::shared_ptr schema = + gluten::arrowGetOrThrow(arrow::ImportSchema(reinterpret_cast(cSchema.get()))); + auto reader = runtime->createShuffleReader(schema, readerOptions); + + GLUTEN_ASSIGN_OR_THROW(auto in, arrow::io::ReadableFile::Open(dataFile)); + // Read all partitions. + auto iter = reader->readStream(in); + while (iter->hasNext()) { + // Read and discard. + auto cb = iter->next(); + } + readerMetrics.decompressTime = reader->getDecompressTime(); + readerMetrics.deserializeTime = reader->getDeserializeTime(); + } // Cleanup shuffle outputs cleanupShuffleOutput(dataFile, localDirs, isFromEnv); } @@ -207,20 +260,37 @@ void updateBenchmarkMetrics( ::benchmark::State& state, const int64_t& elapsedTime, const int64_t& readInputTime, - const WriterMetrics& writerMetrics) { + const WriterMetrics& writerMetrics, + const ReaderMetrics& readerMetrics) { state.counters["read_input_time"] = benchmark::Counter(readInputTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); state.counters["elapsed_time"] = benchmark::Counter(elapsedTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_write_time"] = benchmark::Counter( - writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_spill_time"] = benchmark::Counter( - writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_split_time"] = benchmark::Counter( - writerMetrics.splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_compress_time"] = benchmark::Counter( - writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + if (FLAGS_run_shuffle || FLAGS_with_shuffle) { + state.counters["shuffle_write_time"] = benchmark::Counter( + writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_spill_time"] = benchmark::Counter( + writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_compress_time"] = benchmark::Counter( + writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_decompress_time"] = benchmark::Counter( + readerMetrics.decompressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_deserialize_time"] = benchmark::Counter( + readerMetrics.deserializeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + + auto splitTime = writerMetrics.splitTime; + if (FLAGS_scan_mode == "stream") { + splitTime -= readInputTime; + } + state.counters["shuffle_split_time"] = + benchmark::Counter(splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + + state.counters["shuffle_spilled_bytes"] = benchmark::Counter( + writerMetrics.bytesSpilled, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024); + state.counters["shuffle_write_bytes"] = benchmark::Counter( + writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024); + } } } // namespace @@ -246,6 +316,7 @@ auto BM_Generic = [](::benchmark::State& state, } WriterMetrics writerMetrics{}; + ReaderMetrics readerMetrics{}; int64_t readInputTime = 0; int64_t elapsedTime = 0; @@ -275,7 +346,7 @@ auto BM_Generic = [](::benchmark::State& state, listenerPtr->setIterator(resultIter.get()); if (FLAGS_with_shuffle) { - runShuffle(runtime, listenerPtr, resultIter, writerMetrics); + runShuffle(runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, false); } else { // May write the output into file. auto veloxPlan = dynamic_cast(runtime)->getVeloxPlan(); @@ -299,7 +370,7 @@ auto BM_Generic = [](::benchmark::State& state, return; } if (FLAGS_print_result) { - LOG(INFO) << maybeBatch.ValueOrDie()->ToString(); + LOG(WARNING) << maybeBatch.ValueOrDie()->ToString(); } if (!FLAGS_save_output.empty()) { GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie())); @@ -322,18 +393,18 @@ auto BM_Generic = [](::benchmark::State& state, const auto* task = rawIter->task(); const auto* planNode = rawIter->veloxPlan(); auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); - LOG(INFO) << statsStr; + LOG(WARNING) << statsStr; } } - updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics); + updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); }; -auto BM_ShuffleWrite = [](::benchmark::State& state, - const std::string& inputFile, - RuntimeFactory runtimeFactory, - FileReaderType readerType) { +auto BM_ShuffleWriteRead = [](::benchmark::State& state, + const std::string& inputFile, + RuntimeFactory runtimeFactory, + FileReaderType readerType) { setCpu(state); auto listener = std::make_unique(FLAGS_memory_limit); @@ -341,31 +412,48 @@ auto BM_ShuffleWrite = [](::benchmark::State& state, auto runtime = runtimeFactory(std::move(listener)); WriterMetrics writerMetrics{}; + ReaderMetrics readerMetrics{}; int64_t readInputTime = 0; int64_t elapsedTime = 0; { ScopedTimer timer(&elapsedTime); for (auto _ : state) { auto resultIter = getInputIteratorFromFileReader(inputFile, readerType); - runShuffle(runtime, listenerPtr, resultIter, writerMetrics); + runShuffle(runtime, listenerPtr, resultIter, writerMetrics, readerMetrics, FLAGS_run_shuffle_read); auto reader = static_cast(resultIter->getInputIter()); readInputTime += reader->getCollectBatchTime(); } } - updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics); + updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics, readerMetrics); Runtime::release(runtime); }; int main(int argc, char** argv) { - ::benchmark::Initialize(&argc, argv); gflags::ParseCommandLineFlags(&argc, &argv, true); + std::ostringstream ss; + ss << "Setting flags from command line args: " << std::endl; + std::vector flags; + google::GetAllFlags(&flags); + auto filename = std::filesystem::path(__FILE__).filename(); + for (const auto& flag : flags) { + if (std::filesystem::path(flag.filename).filename() == filename) { + ss << " FLAGS_" << flag.name << ": default = " << flag.default_value << ", current = " << flag.current_value + << std::endl; + } + } + LOG(WARNING) << ss.str(); + + ::benchmark::Initialize(&argc, argv); + // Init Velox backend. auto backendConf = gluten::defaultConf(); auto sessionConf = gluten::defaultConf(); - backendConf.insert({gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)}); + backendConf.insert({gluten::kDebugModeEnabled, std::to_string(FLAGS_debug_mode)}); + backendConf.insert({gluten::kGlogVerboseLevel, std::to_string(FLAGS_v)}); + backendConf.insert({gluten::kGlogSeverityLevel, std::to_string(FLAGS_minloglevel)}); if (!FLAGS_conf.empty()) { abortIfFileNotExists(FLAGS_conf); std::ifstream file(FLAGS_conf); @@ -425,7 +513,7 @@ int main(int argc, char** argv) { std::vector dataFiles{}; if (FLAGS_run_example) { - LOG(INFO) << "Running example..."; + LOG(WARNING) << "Running example..."; dataFiles.resize(2); try { substraitJsonFile = getGeneratedFilePath("example.json"); @@ -484,33 +572,23 @@ int main(int argc, char** argv) { if (!errorMsg.empty()) { LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl - << "If simulating a first stage, the usage is:" << std::endl - << "./generic_benchmark " - << "--plan /absolute-path/to/substrait_json_file " - << "--split /absolute-path/to/split_json_file_1,/abosolute-path/to/split_json_file_2,..." - << "--data /absolute-path/to/data_file_1,/absolute-path/to/data_file_2,..." << std::endl - << "If simulating a middle stage, the usage is:" << std::endl - << "./generic_benchmark " - << "--plan /absolute-path/to/substrait_json_file " - << "--data /absolute-path/to/data_file_1,/absolute-path/to/data_file_2,..."; - LOG(ERROR) << "*** Please check docs/developers/MicroBenchmarks.md for the full usage. ***"; + << "*** Please check docs/developers/MicroBenchmarks.md for the full usage. ***"; ::benchmark::Shutdown(); std::exit(EXIT_FAILURE); } } - // Check whether input files exist. - LOG(INFO) << "Using substrait json file: " << std::endl << substraitJsonFile; + LOG(WARNING) << "Using substrait json file: " << std::endl << substraitJsonFile; if (!splitFiles.empty()) { - LOG(INFO) << "Using " << splitFiles.size() << " input split file(s): "; + LOG(WARNING) << "Using " << splitFiles.size() << " input split file(s): "; for (const auto& splitFile : splitFiles) { - LOG(INFO) << splitFile; + LOG(WARNING) << splitFile; } } if (!dataFiles.empty()) { - LOG(INFO) << "Using " << dataFiles.size() << " input data file(s): "; + LOG(WARNING) << "Using " << dataFiles.size() << " input data file(s): "; for (const auto& dataFile : dataFiles) { - LOG(INFO) << dataFile; + LOG(WARNING) << dataFile; } } @@ -528,37 +606,28 @@ int main(int argc, char** argv) { setUpBenchmark(bm); \ } while (0) -#define SHUFFLE_WRITE_BENCHMARK(READER_TYPE) \ - do { \ - auto* bm = \ - ::benchmark::RegisterBenchmark("ShuffleWrite", BM_ShuffleWrite, dataFiles[0], runtimeFactory, READER_TYPE) \ - ->MeasureProcessCPUTime() \ - ->UseRealTime(); \ - setUpBenchmark(bm); \ +#define SHUFFLE_WRITE_READ_BENCHMARK(READER_TYPE) \ + do { \ + auto* bm = ::benchmark::RegisterBenchmark( \ + "ShuffleWriteRead", BM_ShuffleWriteRead, dataFiles[0], runtimeFactory, READER_TYPE) \ + ->MeasureProcessCPUTime() \ + ->UseRealTime(); \ + setUpBenchmark(bm); \ } while (0) - LOG(INFO) << "Using options: "; - LOG(INFO) << "threads: " << FLAGS_threads; - LOG(INFO) << "iterations: " << FLAGS_iterations; - LOG(INFO) << "cpu: " << FLAGS_cpu; - LOG(INFO) << "print_result: " << FLAGS_print_result; - LOG(INFO) << "save_output: " << FLAGS_save_output; - LOG(INFO) << "batch_size: " << FLAGS_batch_size; - LOG(INFO) << "write_path: " << FLAGS_write_path; - if (dataFiles.empty()) { GENERIC_BENCHMARK(FileReaderType::kNone); } else { FileReaderType readerType; if (FLAGS_scan_mode == "buffered") { readerType = FileReaderType::kBuffered; - LOG(INFO) << "Using buffered mode for reading parquet data."; + LOG(WARNING) << "Using buffered mode for reading parquet data."; } else { readerType = FileReaderType::kStream; - LOG(INFO) << "Using stream mode for reading parquet data."; + LOG(WARNING) << "Using stream mode for reading parquet data."; } if (FLAGS_run_shuffle) { - SHUFFLE_WRITE_BENCHMARK(readerType); + SHUFFLE_WRITE_READ_BENCHMARK(readerType); } else { GENERIC_BENCHMARK(readerType); } diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc b/cpp/velox/benchmarks/common/BenchmarkUtils.cc index c3baa2f33915..345f9da8e16d 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc @@ -159,7 +159,11 @@ setLocalDirsAndDataFileFromEnv(std::string& dataFile, std::vector& // Set local dirs. auto joinedDirs = std::string(joinedDirsC); // Split local dirs and use thread id to choose one directory for data file. - localDirs = gluten::splitPaths(joinedDirs); + auto dirs = gluten::splitPaths(joinedDirs); + for (const auto& dir : dirs) { + localDirs.push_back(arrow::fs::internal::ConcatAbstractPath(dir, "temp_shuffle_" + generateUuid())); + std::filesystem::create_directory(localDirs.back()); + } size_t id = std::hash{}(std::this_thread::get_id()) % localDirs.size(); ARROW_ASSIGN_OR_RAISE(dataFile, gluten::createTempShuffleFile(localDirs[id])); } else { diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index e165d4a91da8..5c6c4470b2b5 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -1404,7 +1404,7 @@ arrow::Result VeloxHashShuffleWriter::partitionBufferSizeAfterShrink(u arrow::Status VeloxHashShuffleWriter::preAllocPartitionBuffers(uint32_t preAllocBufferSize) { for (auto& pid : partitionUsed_) { auto newSize = std::max(preAllocBufferSize, partition2RowCount_[pid]); - DLOG_IF(INFO, partitionBufferSize_[pid] != newSize) + LOG_IF(WARNING, partitionBufferSize_[pid] != newSize) << "Actual partition buffer size - current: " << partitionBufferSize_[pid] << ", newSize: " << newSize << std::endl; // Make sure the size to be allocated is larger than the size to be filled. diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc index ab93d9a33d04..4d002499c9af 100644 --- a/cpp/velox/shuffle/VeloxShuffleReader.cc +++ b/cpp/velox/shuffle/VeloxShuffleReader.cc @@ -312,8 +312,7 @@ std::shared_ptr VeloxHashShuffleReaderDeserializer::next() { if (hasComplexType_) { uint32_t numRows; GLUTEN_ASSIGN_OR_THROW( - auto arrowBuffers, - BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, numRows, decompressTime_)); + auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_)); if (numRows == 0) { // Reach EOS. return nullptr; @@ -332,7 +331,7 @@ std::shared_ptr VeloxHashShuffleReaderDeserializer::next() { uint32_t numRows = 0; while (!merged_ || merged_->numRows() < batchSize_) { GLUTEN_ASSIGN_OR_THROW( - arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, memoryPool_, numRows, decompressTime_)); + arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_)); if (numRows == 0) { reachEos_ = true; break; @@ -401,7 +400,7 @@ std::shared_ptr VeloxSortShuffleReaderDeserializer::next() { while (cachedRows_ < batchSize_) { uint32_t numRows; GLUTEN_ASSIGN_OR_THROW( - auto arrowBuffers, BlockPayload::deserialize(in_.get(), schema_, codec_, arrowPool_, numRows, decompressTime_)); + auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_)); if (numRows == 0) { reachEos_ = true; diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index 21f222b42690..bd469f34c81c 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -320,23 +320,44 @@ cd /path/to/gluten/cpp/build/velox/benchmarks --threads 1 ``` -### Run shuffle write task only +### Run shuffle write/read task only Developers can only run shuffle write task via specifying `--run-shuffle` and `--data` options. The parquet format input will be read from arrow-parquet reader and sent to shuffle writer. -This option is similar to the `--with-shuffle` option, but it doesn't require the plan and split files. +The `--run-shuffle` option is similar to the `--with-shuffle` option, but it doesn't require the plan and split files. The round-robin partitioner is used by default. Besides, random partitioning can be used for testing purpose. By specifying option `--partitioning random`, the partitioner will generate a random partition id for each row. +To evaluate the shuffle reader performance, developers can set `--run-shuffle-read` option to add read process after the write task finishes. + +The below command will run shuffle write/read in single thread, using sort shuffle writer with 40000 partitions and random partition id. ```shell cd /path/to/gluten/cpp/build/velox/benchmarks ./generic_benchmark \ --run-shuffle \ +--run-shuffle-read \ --data /path/to/input_for_shuffle_write.parquet --shuffle-writer sort \ +--partitioning random \ +--shuffle-partitions 40000 \ --threads 1 ``` +The output should be like: + +``` +------------------------------------------------------------------------------------------------------------------------- +Benchmark Time CPU Iterations UserCounters... +------------------------------------------------------------------------------------------------------------------------- +ShuffleWriteRead/iterations:1/process_time/real_time/threads:1 121637629714 ns 121309450910 ns 1 elapsed_time=121.638G read_input_time=25.2637G shuffle_compress_time=10.8311G shuffle_decompress_time=4.04055G shuffle_deserialize_time=7.24289G shuffle_spill_time=0 shuffle_split_time=69.9098G shuffle_write_time=2.03274G +``` + +## Enable debug mode + +`spark.gluten.sql.debug`(debug mode) is set to false by default thereby the google glog levels are limited to only print `WARNING` or higher severity logs. +Unless `spark.gluten.sql.debug` is set in the INI file via `--conf`, the logging behavior is same as debug mode off. +Developers can use `--debug-mode` command line flag to turn on debug mode when needed, and set verbosity/severity level via command line flags `--v` and `--minloglevel`. Note that constructing and deconstructing log strings can be very time-consuming, which may cause benchmark times to be inaccurate. + ## Simulate write tasks The last operator for a write task is a file write operator, and the output from Velox pipeline only diff --git a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 26527e1c81c9..b3b7603acf53 100644 --- a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -212,9 +212,13 @@ object ColumnarShuffleExchangeExec extends Logging { } def useSortBasedShuffle(partitioning: Partitioning, output: Seq[Attribute]): Boolean = { + val conf = GlutenConfig.getConf + lazy val isCelebornSortBasedShuffle = conf.isUseCelebornShuffleManager && + conf.celebornShuffleWriterType == GlutenConfig.GLUTEN_SORT_SHUFFLE_WRITER partitioning != SinglePartition && (partitioning.numPartitions >= GlutenConfig.getConf.columnarShuffleSortPartitionsThreshold || - output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold) + output.size >= GlutenConfig.getConf.columnarShuffleSortColumnsThreshold) || + isCelebornSortBasedShuffle } class DummyPairRDDWithPartitions(@transient private val sc: SparkContext, numPartitions: Int)