diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 3a4d6e9e8792..4d7c30402985 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -196,6 +196,7 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS shuffle/Partitioning.cc shuffle/Payload.cc shuffle/rss/RssPartitionWriter.cc + shuffle/RandomPartitioner.cc shuffle/RoundRobinPartitioner.cc shuffle/ShuffleMemoryPool.cc shuffle/ShuffleReader.cc diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 4e069ec7a6d6..1e5326689229 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -756,6 +756,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe throw gluten::GlutenException(std::string("Short partitioning name can't be null")); } + // Build ShuffleWriterOptions. auto shuffleWriterOptions = ShuffleWriterOptions{ .bufferSize = bufferSize, .bufferReallocThreshold = reallocThreshold, @@ -763,7 +764,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe .taskAttemptId = (int64_t)taskAttemptId, .startPartitionId = startPartitionId, }; + auto shuffleWriterTypeC = env->GetStringUTFChars(shuffleWriterTypeJstr, JNI_FALSE); + auto shuffleWriterType = std::string(shuffleWriterTypeC); + env->ReleaseStringUTFChars(shuffleWriterTypeJstr, shuffleWriterTypeC); + + if (shuffleWriterType == "sort") { + shuffleWriterOptions.shuffleWriterType = kSortShuffle; + } + // Build PartitionWriterOptions. auto partitionWriterOptions = PartitionWriterOptions{ .mergeBufferSize = mergeBufferSize, .mergeThreshold = mergeThreshold, @@ -779,20 +788,13 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe partitionWriterOptions.codecBackend = getCodecBackend(env, codecBackendJstr); partitionWriterOptions.compressionMode = getCompressionMode(env, compressionModeJstr); } + std::unique_ptr partitionWriter; auto partitionWriterTypeC = env->GetStringUTFChars(partitionWriterTypeJstr, JNI_FALSE); auto partitionWriterType = std::string(partitionWriterTypeC); env->ReleaseStringUTFChars(partitionWriterTypeJstr, partitionWriterTypeC); - auto shuffleWriterTypeC = env->GetStringUTFChars(shuffleWriterTypeJstr, JNI_FALSE); - auto shuffleWriterType = std::string(shuffleWriterTypeC); - env->ReleaseStringUTFChars(shuffleWriterTypeJstr, shuffleWriterTypeC); - - if (shuffleWriterType == "sort") { - shuffleWriterOptions.shuffleWriterType = kSortShuffle; - } - if (partitionWriterType == "local") { if (dataFileJstr == NULL) { throw gluten::GlutenException(std::string("Shuffle DataFile can't be null")); diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index 6c4a0af39a22..f56543bab5e0 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -25,7 +25,6 @@ #include "shuffle/Payload.h" #include "shuffle/Spill.h" #include "shuffle/Utils.h" -#include "utils/Timer.h" namespace gluten { @@ -547,7 +546,7 @@ arrow::Status LocalPartitionWriter::evict( arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, int64_t rawSize, const char* data, int64_t length) { rawPartitionLengths_[partitionId] += rawSize; - if (partitionId <= lastEvictPid_) { + if (partitionId < lastEvictPid_) { RETURN_NOT_OK(finishSpill()); } lastEvictPid_ = partitionId; diff --git a/cpp/core/shuffle/Partitioner.cc b/cpp/core/shuffle/Partitioner.cc index 80b4598a1f17..fb1a5aab44eb 100644 --- a/cpp/core/shuffle/Partitioner.cc +++ b/cpp/core/shuffle/Partitioner.cc @@ -18,6 +18,7 @@ #include "shuffle/Partitioner.h" #include "shuffle/FallbackRangePartitioner.h" #include "shuffle/HashPartitioner.h" +#include "shuffle/RandomPartitioner.h" #include "shuffle/RoundRobinPartitioner.h" #include "shuffle/SinglePartitioner.h" @@ -34,6 +35,8 @@ Partitioner::make(Partitioning partitioning, int32_t numPartitions, int32_t star return std::make_shared(); case Partitioning::kRange: return std::make_shared(numPartitions); + case Partitioning::kRandom: + return std::make_shared(numPartitions); default: return arrow::Status::Invalid("Unsupported partitioning type: " + std::to_string(partitioning)); } diff --git a/cpp/core/shuffle/Partitioning.cc b/cpp/core/shuffle/Partitioning.cc index dfe848d63046..84fe6ecd972f 100644 --- a/cpp/core/shuffle/Partitioning.cc +++ b/cpp/core/shuffle/Partitioning.cc @@ -23,6 +23,7 @@ static const std::string kSinglePartitioningName = "single"; static const std::string kRoundRobinPartitioningName = "rr"; static const std::string kHashPartitioningName = "hash"; static const std::string kRangePartitioningName = "range"; +static const std::string kRandomPartitioningName = "random"; } // namespace namespace gluten { @@ -39,6 +40,9 @@ Partitioning toPartitioning(std::string name) { if (name == kRangePartitioningName) { return Partitioning::kRange; } + if (name == kRandomPartitioningName) { + return Partitioning::kRandom; + } throw GlutenException("Invalid partition name: " + name); } diff --git a/cpp/core/shuffle/Partitioning.h b/cpp/core/shuffle/Partitioning.h index 1d65e9d6b993..a60d43561bee 100644 --- a/cpp/core/shuffle/Partitioning.h +++ b/cpp/core/shuffle/Partitioning.h @@ -20,7 +20,7 @@ #include namespace gluten { -enum Partitioning { kSingle, kRoundRobin, kHash, kRange }; +enum Partitioning { kSingle, kRoundRobin, kHash, kRange, kRandom /*for test only*/ }; Partitioning toPartitioning(std::string name); diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index beca3fa02d61..fb91c326b679 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -503,6 +503,7 @@ arrow::Status UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o } arrow::Result> UncompressedDiskBlockPayload::readUncompressedBuffer() { + ScopedTimer timer(&writeTime_); readPos_++; int64_t bufferLength; RETURN_NOT_OK(inputStream_->Read(sizeof(int64_t), &bufferLength)); @@ -525,6 +526,7 @@ CompressedDiskBlockPayload::CompressedDiskBlockPayload( : Payload(Type::kCompressed, numRows, isValidityBuffer), inputStream_(inputStream), rawSize_(rawSize) {} arrow::Status CompressedDiskBlockPayload::serialize(arrow::io::OutputStream* outputStream) { + ScopedTimer timer(&writeTime_); ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_)); RETURN_NOT_OK(outputStream->Write(block)); return arrow::Status::OK(); diff --git a/cpp/core/shuffle/RandomPartitioner.cc b/cpp/core/shuffle/RandomPartitioner.cc new file mode 100644 index 000000000000..06d87be40f7f --- /dev/null +++ b/cpp/core/shuffle/RandomPartitioner.cc @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/RandomPartitioner.h" + +namespace gluten { + +arrow::Status gluten::RandomPartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + std::vector& row2Partition, + std::vector& partition2RowCount) { + std::fill(std::begin(partition2RowCount), std::end(partition2RowCount), 0); + row2Partition.resize(numRows); + + for (int32_t i = 0; i < numRows; ++i) { + row2Partition[i] = dist_(rng_); + } + + for (auto& pid : row2Partition) { + partition2RowCount[pid]++; + } + + return arrow::Status::OK(); +} + +arrow::Status gluten::RandomPartitioner::compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map>& rowVectorIndexMap) { + auto index = static_cast(vectorIndex) << 32; + for (int32_t i = 0; i < numRows; ++i) { + int64_t combined = index | (i & 0xFFFFFFFFLL); + auto& vec = rowVectorIndexMap[dist_(rng_)]; + vec.push_back(combined); + } + + return arrow::Status::OK(); +} + +} // namespace gluten diff --git a/cpp/core/shuffle/RandomPartitioner.h b/cpp/core/shuffle/RandomPartitioner.h new file mode 100644 index 000000000000..77d00716943c --- /dev/null +++ b/cpp/core/shuffle/RandomPartitioner.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include "shuffle/Partitioner.h" + +namespace gluten { +class RandomPartitioner final : public Partitioner { + public: + RandomPartitioner(int32_t numPartitions) : Partitioner(numPartitions, false) { + std::random_device dev; + rng_.seed(dev()); + dist_ = std::uniform_int_distribution(0, numPartitions - 1); + } + + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + std::vector& row2Partition, + std::vector& partition2RowCount) override; + + arrow::Status compute( + const int32_t* pidArr, + const int64_t numRows, + const int32_t vectorIndex, + std::unordered_map>& rowVectorIndexMap) override; + + private: + std::mt19937 rng_; + std::uniform_int_distribution dist_; +}; +} // namespace gluten diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index 6d66ea506a7e..4eed625628f3 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -513,6 +513,7 @@ set(VELOX_SRCS operators/serializer/VeloxRowToColumnarConverter.cc operators/writer/VeloxParquetDatasource.cc shuffle/VeloxShuffleReader.cc + shuffle/VeloxShuffleWriter.cc shuffle/VeloxHashBasedShuffleWriter.cc shuffle/VeloxSortBasedShuffleWriter.cc substrait/SubstraitParser.cc diff --git a/cpp/velox/benchmarks/CMakeLists.txt b/cpp/velox/benchmarks/CMakeLists.txt index 903ec0d65825..1aa199b13696 100644 --- a/cpp/velox/benchmarks/CMakeLists.txt +++ b/cpp/velox/benchmarks/CMakeLists.txt @@ -39,8 +39,6 @@ add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc) add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc) -add_velox_benchmark(shuffle_split_benchmark ShuffleSplitBenchmark.cc) - if(ENABLE_ORC) add_velox_benchmark(orc_converter exec/OrcConverter.cc) endif() diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index b7a50800e4ea..d8c8c0c24a94 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -31,10 +31,10 @@ #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" #include "shuffle/LocalPartitionWriter.h" -#include "shuffle/VeloxHashBasedShuffleWriter.h" #include "shuffle/VeloxShuffleWriter.h" #include "shuffle/rss/RssPartitionWriter.h" #include "utils/StringUtil.h" +#include "utils/Timer.h" #include "utils/VeloxArrowUtils.h" #include "utils/exception.h" #include "utils/tests/LocalRssClient.h" @@ -47,13 +47,18 @@ namespace { DEFINE_bool(print_result, true, "Print result for execution"); DEFINE_string(save_output, "", "Path to parquet file for saving the task output iterator"); DEFINE_bool(with_shuffle, false, "Add shuffle split at end."); -DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single"); +DEFINE_string( + partitioning, + "rr", + "Short partitioning name. Valid options are rr, hash, range, single, random (only for test purpose)"); +DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort"); DEFINE_bool(rss, false, "Mocking rss."); -DEFINE_bool(zstd, false, "Use ZSTD as shuffle compression codec"); -DEFINE_bool(qat_gzip, false, "Use QAT GZIP as shuffle compression codec"); -DEFINE_bool(qat_zstd, false, "Use QAT ZSTD as shuffle compression codec"); -DEFINE_bool(iaa_gzip, false, "Use IAA GZIP as shuffle compression codec"); +DEFINE_string( + compression, + "lz4", + "Specify the compression codec. Valid options are lz4, zstd, qat_gzip, qat_zstd, iaa_gzip"); DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions"); +DEFINE_bool(run_shuffle, false, "Only run shuffle write."); DEFINE_bool(run_example, false, "Run the example and exit."); DEFINE_string(plan, "", "Path to input json file of the substrait plan."); @@ -61,33 +66,61 @@ DEFINE_string( split, "", "Path to input json file of the splits. Only valid for simulating the first stage. Use comma-separated list for multiple splits."); -DEFINE_string(data, "", "Path to input data files in parquet format, used for shuffle read."); +DEFINE_string(data, "", "Path to input data files in parquet format. Use comma-separated list for multiple files."); DEFINE_string(conf, "", "Path to the configuration file."); DEFINE_string(write_path, "/tmp", "Path to save the output from write tasks."); DEFINE_int64(memory_limit, std::numeric_limits::max(), "Memory limit used to trigger spill."); +DEFINE_string( + scan_mode, + "stream", + "Scan mode for reading parquet data." + "'stream' mode: Input file scan happens inside of the pipeline." + "'buffered' mode: First read all data into memory and feed the pipeline with it."); struct WriterMetrics { int64_t splitTime; int64_t evictTime; int64_t writeTime; int64_t compressTime; + + public: + explicit WriterMetrics() : splitTime(0), evictTime(0), writeTime(0), compressTime(0) {} }; +void setUpBenchmark(::benchmark::internal::Benchmark* bm) { + if (FLAGS_threads > 0) { + bm->Threads(FLAGS_threads); + } else { + bm->ThreadRange(1, std::thread::hardware_concurrency()); + } + if (FLAGS_iterations > 0) { + bm->Iterations(FLAGS_iterations); + } +} + std::shared_ptr createShuffleWriter( + Runtime* runtime, VeloxMemoryManager* memoryManager, const std::string& dataFile, const std::vector& localDirs) { PartitionWriterOptions partitionWriterOptions{}; - if (FLAGS_zstd) { + + // Configure compression. + if (FLAGS_compression == "lz4") { + partitionWriterOptions.codecBackend = CodecBackend::NONE; + partitionWriterOptions.compressionType = arrow::Compression::LZ4_FRAME; + partitionWriterOptions.compressionTypeStr = "lz4"; + } else if (FLAGS_compression == "zstd") { partitionWriterOptions.codecBackend = CodecBackend::NONE; partitionWriterOptions.compressionType = arrow::Compression::ZSTD; - } else if (FLAGS_qat_gzip) { + partitionWriterOptions.compressionTypeStr = "zstd"; + } else if (FLAGS_compression == "qat_gzip") { partitionWriterOptions.codecBackend = CodecBackend::QAT; partitionWriterOptions.compressionType = arrow::Compression::GZIP; - } else if (FLAGS_qat_zstd) { + } else if (FLAGS_compression == "qat_zstd") { partitionWriterOptions.codecBackend = CodecBackend::QAT; partitionWriterOptions.compressionType = arrow::Compression::ZSTD; - } else if (FLAGS_iaa_gzip) { + } else if (FLAGS_compression == "iaa_gzip") { partitionWriterOptions.codecBackend = CodecBackend::IAA; partitionWriterOptions.compressionType = arrow::Compression::GZIP; } @@ -111,28 +144,84 @@ std::shared_ptr createShuffleWriter( auto options = ShuffleWriterOptions{}; options.partitioning = gluten::toPartitioning(FLAGS_partitioning); - GLUTEN_ASSIGN_OR_THROW( - auto shuffleWriter, - VeloxHashBasedShuffleWriter::create( - FLAGS_shuffle_partitions, - std::move(partitionWriter), - std::move(options), - memoryManager->getLeafMemoryPool(), - memoryManager->getArrowMemoryPool())); - - return shuffleWriter; + if (FLAGS_shuffle_writer == "sort") { + options.shuffleWriterType = gluten::kSortShuffle; + } + auto shuffleWriter = runtime->createShuffleWriter( + FLAGS_shuffle_partitions, std::move(partitionWriter), std::move(options), memoryManager); + + return std::reinterpret_pointer_cast(shuffleWriter); } void populateWriterMetrics( const std::shared_ptr& shuffleWriter, - int64_t shuffleWriteTime, + int64_t totalTime, WriterMetrics& metrics) { metrics.compressTime += shuffleWriter->totalCompressTime(); metrics.evictTime += shuffleWriter->totalEvictTime(); metrics.writeTime += shuffleWriter->totalWriteTime(); - metrics.evictTime += - (shuffleWriteTime - shuffleWriter->totalCompressTime() - shuffleWriter->totalEvictTime() - - shuffleWriter->totalWriteTime()); + auto splitTime = totalTime - metrics.compressTime - metrics.evictTime - metrics.writeTime; + if (splitTime > 0) { + metrics.splitTime += splitTime; + } +} + +void setCpu(::benchmark::State& state) { + // Pin each threads to different CPU# starting from 0 or --cpu. + auto cpu = state.thread_index(); + if (FLAGS_cpu != -1) { + cpu += FLAGS_cpu; + } + LOG(INFO) << "Setting CPU for thread " << state.thread_index() << " to " << cpu; + gluten::setCpu(cpu); +} + +void runShuffle( + Runtime* runtime, + VeloxMemoryManager* memoryManager, + BenchmarkAllocationListener* listener, + const std::shared_ptr& resultIter, + WriterMetrics& metrics) { + std::string dataFile; + std::vector localDirs; + bool isFromEnv; + GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv)); + + auto shuffleWriter = createShuffleWriter(runtime, memoryManager, dataFile, localDirs); + listener->setShuffleWriter(shuffleWriter.get()); + + int64_t totalTime = 0; + { + gluten::ScopedTimer timer(&totalTime); + while (resultIter->hasNext()) { + GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), ShuffleWriter::kMinMemLimit)); + } + GLUTEN_THROW_NOT_OK(shuffleWriter->stop()); + } + + populateWriterMetrics(shuffleWriter, totalTime, metrics); + // Cleanup shuffle outputs + cleanupShuffleOutput(dataFile, localDirs, isFromEnv); +} + +void updateBenchmarkMetrics( + ::benchmark::State& state, + const int64_t& elapsedTime, + const int64_t& readInputTime, + const WriterMetrics& writerMetrics) { + state.counters["read_input_time"] = + benchmark::Counter(readInputTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["elapsed_time"] = + benchmark::Counter(elapsedTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + + state.counters["shuffle_write_time"] = benchmark::Counter( + writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_spill_time"] = benchmark::Counter( + writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_split_time"] = benchmark::Counter( + writerMetrics.splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); + state.counters["shuffle_compress_time"] = benchmark::Counter( + writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); } } // namespace @@ -141,148 +230,140 @@ auto BM_Generic = [](::benchmark::State& state, const std::string& planFile, const std::vector& splitFiles, const std::vector& dataFiles, - const std::unordered_map& conf, + Runtime* runtime, FileReaderType readerType) { - // Pin each threads to different CPU# starting from 0 or --cpu. - if (FLAGS_cpu != -1) { - setCpu(FLAGS_cpu + state.thread_index()); - } else { - setCpu(state.thread_index()); - } - memory::MemoryManager::testingSetInstance({}); + setCpu(state); + auto listener = std::make_unique(FLAGS_memory_limit); + auto* listenerPtr = listener.get(); auto memoryManager = std::make_unique( - "generic_benchmark", - gluten::defaultMemoryAllocator(), - std::make_unique(FLAGS_memory_limit)); - auto runtime = Runtime::create(kVeloxRuntimeKind, conf); + "generic_benchmark", gluten::defaultMemoryAllocator(), std::move(listener)); + auto plan = getPlanFromFile("Plan", planFile); std::vector splits{}; for (const auto& splitFile : splitFiles) { splits.push_back(getPlanFromFile("ReadRel.LocalFiles", splitFile)); } - auto startTime = std::chrono::steady_clock::now(); - int64_t collectBatchTime = 0; - WriterMetrics writerMetrics{}; - for (auto _ : state) { - std::vector> inputIters; - std::vector inputItersRaw; - if (!dataFiles.empty()) { - for (const auto& input : dataFiles) { - inputIters.push_back(getInputIteratorFromFileReader(input, readerType)); - } - std::transform( - inputIters.begin(), - inputIters.end(), - std::back_inserter(inputItersRaw), - [](std::shared_ptr iter) { - return static_cast(iter->getInputIter()); - }); - } - runtime->injectWriteFilesTempPath(FLAGS_write_path); - runtime->parsePlan(reinterpret_cast(plan.data()), plan.size(), std::nullopt); - for (auto& split : splits) { - runtime->parseSplitInfo(reinterpret_cast(split.data()), split.size(), std::nullopt); - } - auto resultIter = - runtime->createResultIterator(memoryManager.get(), "/tmp/test-spill", std::move(inputIters), conf); - if (auto listener = dynamic_cast(memoryManager->getListener())) { - listener->setIterator(resultIter.get()); - } - auto veloxPlan = dynamic_cast(runtime)->getVeloxPlan(); - if (FLAGS_with_shuffle) { - int64_t shuffleWriteTime; - TIME_NANO_START(shuffleWriteTime); - std::string dataFile; - std::vector localDirs; - bool isFromEnv; - GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv)); - const auto& shuffleWriter = createShuffleWriter(memoryManager.get(), dataFile, localDirs); - while (resultIter->hasNext()) { - GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), ShuffleWriter::kMinMemLimit)); + WriterMetrics writerMetrics{}; + int64_t readInputTime = 0; + int64_t elapsedTime = 0; + + { + ScopedTimer timer(&elapsedTime); + for (auto _ : state) { + std::vector> inputIters; + std::vector inputItersRaw; + if (!dataFiles.empty()) { + for (const auto& input : dataFiles) { + inputIters.push_back(getInputIteratorFromFileReader(input, readerType)); + } + std::transform( + inputIters.begin(), + inputIters.end(), + std::back_inserter(inputItersRaw), + [](std::shared_ptr iter) { + return static_cast(iter->getInputIter()); + }); } - GLUTEN_THROW_NOT_OK(shuffleWriter->stop()); - TIME_NANO_END(shuffleWriteTime); - populateWriterMetrics(shuffleWriter, shuffleWriteTime, writerMetrics); - // Cleanup shuffle outputs - cleanupShuffleOutput(dataFile, localDirs, isFromEnv); - } else { - // May write the output into file. - ArrowSchema cSchema; - toArrowSchema(veloxPlan->outputType(), memoryManager->getLeafMemoryPool().get(), &cSchema); - GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&cSchema)); - ArrowWriter writer{FLAGS_save_output}; - state.PauseTiming(); - if (!FLAGS_save_output.empty()) { - GLUTEN_THROW_NOT_OK(writer.initWriter(*(outputSchema.get()))); + runtime->injectWriteFilesTempPath(FLAGS_write_path); + runtime->parsePlan(reinterpret_cast(plan.data()), plan.size(), std::nullopt); + for (auto& split : splits) { + runtime->parseSplitInfo(reinterpret_cast(split.data()), split.size(), std::nullopt); } - state.ResumeTiming(); + auto resultIter = runtime->createResultIterator( + memoryManager.get(), "/tmp/test-spill", std::move(inputIters), runtime->getConfMap()); + listenerPtr->setIterator(resultIter.get()); - while (resultIter->hasNext()) { - auto array = resultIter->next()->exportArrowArray(); + if (FLAGS_with_shuffle) { + runShuffle(runtime, memoryManager.get(), listenerPtr, resultIter, writerMetrics); + } else { + // May write the output into file. + auto veloxPlan = dynamic_cast(runtime)->getVeloxPlan(); + + ArrowSchema cSchema; + toArrowSchema(veloxPlan->outputType(), memoryManager->getLeafMemoryPool().get(), &cSchema); + GLUTEN_ASSIGN_OR_THROW(auto outputSchema, arrow::ImportSchema(&cSchema)); + ArrowWriter writer{FLAGS_save_output}; state.PauseTiming(); - auto maybeBatch = arrow::ImportRecordBatch(array.get(), outputSchema); - if (!maybeBatch.ok()) { - state.SkipWithError(maybeBatch.status().message().c_str()); - return; + if (!FLAGS_save_output.empty()) { + GLUTEN_THROW_NOT_OK(writer.initWriter(*(outputSchema.get()))); } - if (FLAGS_print_result) { - LOG(INFO) << maybeBatch.ValueOrDie()->ToString(); + state.ResumeTiming(); + + while (resultIter->hasNext()) { + auto array = resultIter->next()->exportArrowArray(); + state.PauseTiming(); + auto maybeBatch = arrow::ImportRecordBatch(array.get(), outputSchema); + if (!maybeBatch.ok()) { + state.SkipWithError(maybeBatch.status().message().c_str()); + return; + } + if (FLAGS_print_result) { + LOG(INFO) << maybeBatch.ValueOrDie()->ToString(); + } + if (!FLAGS_save_output.empty()) { + GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie())); + } } + + state.PauseTiming(); if (!FLAGS_save_output.empty()) { - GLUTEN_THROW_NOT_OK(writer.writeInBatches(maybeBatch.ValueOrDie())); + GLUTEN_THROW_NOT_OK(writer.closeWriter()); } + state.ResumeTiming(); } - state.PauseTiming(); - if (!FLAGS_save_output.empty()) { - GLUTEN_THROW_NOT_OK(writer.closeWriter()); - } - state.ResumeTiming(); - } - - collectBatchTime += - std::accumulate(inputItersRaw.begin(), inputItersRaw.end(), 0, [](int64_t sum, FileReaderIterator* iter) { - return sum + iter->getCollectBatchTime(); - }); + readInputTime += + std::accumulate(inputItersRaw.begin(), inputItersRaw.end(), 0, [](int64_t sum, FileReaderIterator* iter) { + return sum + iter->getCollectBatchTime(); + }); - auto* rawIter = static_cast(resultIter->getInputIter()); - const auto* task = rawIter->task(); - const auto* planNode = rawIter->veloxPlan(); - auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); - LOG(INFO) << statsStr; + auto* rawIter = static_cast(resultIter->getInputIter()); + const auto* task = rawIter->task(); + const auto* planNode = rawIter->veloxPlan(); + auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); + LOG(INFO) << statsStr; + } } - Runtime::release(runtime); - auto endTime = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(endTime - startTime).count(); + updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics); +}; - state.counters["collect_batch_time"] = - benchmark::Counter(collectBatchTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["elapsed_time"] = - benchmark::Counter(duration, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_write_time"] = benchmark::Counter( - writerMetrics.writeTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_spill_time"] = benchmark::Counter( - writerMetrics.evictTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_split_time"] = benchmark::Counter( - writerMetrics.splitTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); - state.counters["shuffle_compress_time"] = benchmark::Counter( - writerMetrics.compressTime, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1000); +auto BM_ShuffleWrite = + [](::benchmark::State& state, const std::string& inputFile, Runtime* runtime, FileReaderType readerType) { + setCpu(state); + + auto listener = std::make_unique(FLAGS_memory_limit); + auto* listenerPtr = listener.get(); + auto memoryManager = std::make_unique( + "generic_benchmark", gluten::defaultMemoryAllocator(), std::move(listener)); + + WriterMetrics writerMetrics{}; + int64_t readInputTime = 0; + int64_t elapsedTime = 0; + { + ScopedTimer timer(&elapsedTime); + for (auto _ : state) { + auto resultIter = getInputIteratorFromFileReader(inputFile, readerType); + runShuffle(runtime, memoryManager.get(), listenerPtr, resultIter, writerMetrics); + + auto reader = static_cast(resultIter->getInputIter()); + readInputTime += reader->getCollectBatchTime(); + } + } - gluten::VeloxBackend::get()->tearDown(); -}; + updateBenchmarkMetrics(state, elapsedTime, readInputTime, writerMetrics); + }; int main(int argc, char** argv) { ::benchmark::Initialize(&argc, argv); gflags::ParseCommandLineFlags(&argc, &argv, true); // Init Velox backend. - std::unordered_map backendConf; - std::unordered_map sessionConf; + auto backendConf = gluten::defaultConf(); + auto sessionConf = gluten::defaultConf(); backendConf.insert({gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)}); - backendConf.insert({kDebugModeEnabled, "true"}); if (!FLAGS_conf.empty()) { abortIfFileNotExists(FLAGS_conf); std::ifstream file(FLAGS_conf); @@ -334,6 +415,7 @@ int main(int argc, char** argv) { } initVeloxBackend(backendConf); + memory::MemoryManager::testingSetInstance({}); // Parse substrait plan, split file and data files. std::string substraitJsonFile = FLAGS_plan; @@ -352,6 +434,28 @@ int main(int argc, char** argv) { ::benchmark::Shutdown(); std::exit(EXIT_FAILURE); } + } else if (FLAGS_run_shuffle) { + std::string errorMsg{}; + if (FLAGS_data.empty()) { + errorMsg = "Missing '--split' or '--data' option."; + } else if (FLAGS_partitioning != "rr" && FLAGS_partitioning != "random") { + errorMsg = "--run-shuffle only support round-robin partitioning and random partitioning."; + } + if (errorMsg.empty()) { + try { + dataFiles = gluten::splitPaths(FLAGS_data, true); + if (dataFiles.size() > 1) { + errorMsg = "Only one data file is allowed for shuffle write."; + } + } catch (const std::exception& e) { + errorMsg = e.what(); + } + } + if (!errorMsg.empty()) { + LOG(ERROR) << "Incorrect usage: " << errorMsg << std::endl; + ::benchmark::Shutdown(); + std::exit(EXIT_FAILURE); + } } else { // Validate input args. std::string errorMsg{}; @@ -363,15 +467,17 @@ int main(int argc, char** argv) { errorMsg = "Missing '--split' or '--data' option."; } - try { - if (!FLAGS_data.empty()) { - dataFiles = gluten::splitPaths(FLAGS_data, true); - } - if (!FLAGS_split.empty()) { - splitFiles = gluten::splitPaths(FLAGS_split, true); + if (errorMsg.empty()) { + try { + if (!FLAGS_data.empty()) { + dataFiles = gluten::splitPaths(FLAGS_data, true); + } + if (!FLAGS_split.empty()) { + splitFiles = gluten::splitPaths(FLAGS_split, true); + } + } catch (const std::exception& e) { + errorMsg = e.what(); } - } catch (const std::exception& e) { - errorMsg = e.what(); } if (!errorMsg.empty()) { @@ -406,20 +512,23 @@ int main(int argc, char** argv) { } } -#define GENERIC_BENCHMARK(NAME, READER_TYPE) \ - do { \ - auto* bm = ::benchmark::RegisterBenchmark( \ - NAME, BM_Generic, substraitJsonFile, splitFiles, dataFiles, sessionConf, READER_TYPE) \ - ->MeasureProcessCPUTime() \ - ->UseRealTime(); \ - if (FLAGS_threads > 0) { \ - bm->Threads(FLAGS_threads); \ - } else { \ - bm->ThreadRange(1, std::thread::hardware_concurrency()); \ - } \ - if (FLAGS_iterations > 0) { \ - bm->Iterations(FLAGS_iterations); \ - } \ + auto runtime = Runtime::create(kVeloxRuntimeKind, sessionConf); + +#define GENERIC_BENCHMARK(READER_TYPE) \ + do { \ + auto* bm = ::benchmark::RegisterBenchmark( \ + "GenericBenchmark", BM_Generic, substraitJsonFile, splitFiles, dataFiles, runtime, READER_TYPE) \ + ->MeasureProcessCPUTime() \ + ->UseRealTime(); \ + setUpBenchmark(bm); \ + } while (0) + +#define SHUFFLE_WRITE_BENCHMARK(READER_TYPE) \ + do { \ + auto* bm = ::benchmark::RegisterBenchmark("ShuffleWrite", BM_ShuffleWrite, dataFiles[0], runtime, READER_TYPE) \ + ->MeasureProcessCPUTime() \ + ->UseRealTime(); \ + setUpBenchmark(bm); \ } while (0) LOG(INFO) << "Using options: "; @@ -432,14 +541,28 @@ int main(int argc, char** argv) { LOG(INFO) << "write_path: " << FLAGS_write_path; if (dataFiles.empty()) { - GENERIC_BENCHMARK("SkipInput", FileReaderType::kNone); + GENERIC_BENCHMARK(FileReaderType::kNone); } else { - GENERIC_BENCHMARK("InputFromBatchVector", FileReaderType::kBuffered); - GENERIC_BENCHMARK("InputFromBatchStream", FileReaderType::kStream); + FileReaderType readerType; + if (FLAGS_scan_mode == "buffered") { + readerType = FileReaderType::kBuffered; + LOG(INFO) << "Using buffered mode for reading parquet data."; + } else { + readerType = FileReaderType::kStream; + LOG(INFO) << "Using stream mode for reading parquet data."; + } + if (FLAGS_run_shuffle) { + SHUFFLE_WRITE_BENCHMARK(readerType); + } else { + GENERIC_BENCHMARK(readerType); + } } ::benchmark::RunSpecifiedBenchmarks(); ::benchmark::Shutdown(); + Runtime::release(runtime); + gluten::VeloxBackend::get()->tearDown(); + return 0; } diff --git a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc index 7e9959797390..894c35351f17 100644 --- a/cpp/velox/benchmarks/ParquetWriteBenchmark.cc +++ b/cpp/velox/benchmarks/ParquetWriteBenchmark.cc @@ -307,7 +307,7 @@ class GoogleBenchmarkVeloxParquetWriteCacheScanBenchmark : public GoogleBenchmar // GoogleBenchmarkArrowParquetWriteCacheScanBenchmark usage // ./parquet_write_benchmark --threads=1 --file /mnt/DP_disk1/int.parquet --output /tmp/parquet-write int main(int argc, char** argv) { - initVeloxBackend(); + gluten::initVeloxBackend(); uint32_t iterations = 1; uint32_t threads = 1; std::string datafile; diff --git a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc deleted file mode 100644 index 4a4bb69b8d78..000000000000 --- a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc +++ /dev/null @@ -1,394 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "benchmarks/common/BenchmarkUtils.h" -#include "memory/ColumnarBatch.h" -#include "shuffle/LocalPartitionWriter.h" -#include "shuffle/VeloxHashBasedShuffleWriter.h" -#include "shuffle/VeloxShuffleWriter.h" -#include "utils/TestUtils.h" -#include "utils/VeloxArrowUtils.h" -#include "utils/macros.h" - -void printTrace(void) { - char** strings; - size_t i, size; - enum Constexpr { kMaxSize = 1024 }; - void* array[kMaxSize]; - size = backtrace(array, kMaxSize); - strings = backtrace_symbols(array, size); - for (i = 0; i < size; i++) - printf(" %s\n", strings[i]); - puts(""); - free(strings); -} - -using arrow::RecordBatchReader; -using arrow::Status; - -using gluten::GlutenException; -using gluten::ShuffleWriterOptions; -using gluten::VeloxShuffleWriter; - -DEFINE_int32(partitions, -1, "Shuffle partitions"); -DEFINE_string(file, "", "Input file to split"); - -namespace gluten { - -const uint16_t kBatchBufferSize = 4096; -const uint16_t kPartitionBufferSize = 4096; - -class BenchmarkShuffleSplit { - public: - BenchmarkShuffleSplit(std::string fileName) { - getRecordBatchReader(fileName); - } - - void getRecordBatchReader(const std::string& inputFile) { - std::unique_ptr<::parquet::arrow::FileReader> parquetReader; - std::shared_ptr recordBatchReader; - - std::shared_ptr fs; - std::string fileName; - GLUTEN_ASSIGN_OR_THROW(fs, arrow::fs::FileSystemFromUriOrPath(inputFile, &fileName)) - - GLUTEN_ASSIGN_OR_THROW(file_, fs->OpenInputFile(fileName)); - - properties_.set_batch_size(kBatchBufferSize); - properties_.set_pre_buffer(false); - properties_.set_use_threads(false); - - GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader)); - - GLUTEN_THROW_NOT_OK(parquetReader->GetSchema(&schema_)); - - auto numRowgroups = parquetReader->num_row_groups(); - - for (int i = 0; i < numRowgroups; ++i) { - rowGroupIndices_.push_back(i); - } - - auto numColumns = schema_->num_fields(); - for (int i = 0; i < numColumns; ++i) { - columnIndices_.push_back(i); - } - } - - void operator()(benchmark::State& state) { - if (FLAGS_cpu != -1) { - setCpu(FLAGS_cpu + state.thread_index()); - } else { - setCpu(state.thread_index()); - } - - auto options = ShuffleWriterOptions{}; - options.bufferSize = kPartitionBufferSize; - options.partitioning = Partitioning::kRoundRobin; - std::string dataFile; - std::vector localDirs; - bool isFromEnv; - GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv)); - - std::shared_ptr shuffleWriter; - int64_t elapseRead = 0; - int64_t numBatches = 0; - int64_t numRows = 0; - int64_t splitTime = 0; - auto startTime = std::chrono::steady_clock::now(); - - doSplit( - shuffleWriter, - elapseRead, - numBatches, - numRows, - splitTime, - FLAGS_partitions, - std::move(options), - dataFile, - localDirs, - state); - auto endTime = std::chrono::steady_clock::now(); - auto totalTime = (endTime - startTime).count(); - - cleanupShuffleOutput(dataFile, localDirs, isFromEnv); - - state.SetBytesProcessed(int64_t(shuffleWriter->rawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(rowGroupIndices_.size(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(columnIndices_.size(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = - benchmark::Counter(numBatches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = - benchmark::Counter(numRows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = - benchmark::Counter(FLAGS_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(kBatchBufferSize, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(kPartitionBufferSize, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = benchmark::Counter( - shuffleWriter->totalBytesEvicted(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = benchmark::Counter( - shuffleWriter->totalBytesWritten(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = benchmark::Counter( - shuffleWriter->rawPartitionBytes(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1024); - - state.counters["parquet_parse"] = - benchmark::Counter(elapseRead, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = benchmark::Counter( - shuffleWriter->totalWriteTime(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = benchmark::Counter( - shuffleWriter->totalEvictTime(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = benchmark::Counter( - shuffleWriter->totalCompressTime(), benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - - splitTime = splitTime - shuffleWriter->totalEvictTime() - shuffleWriter->totalCompressTime() - - shuffleWriter->totalWriteTime(); - - state.counters["split_time"] = - benchmark::Counter(splitTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - - state.counters["total_time"] = - benchmark::Counter(totalTime, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - shuffleWriter.reset(); - } - - protected: - long setCpu(uint32_t cpuindex) { - cpu_set_t cs; - CPU_ZERO(&cs); - CPU_SET(cpuindex, &cs); - return sched_setaffinity(0, sizeof(cs), &cs); - } - - virtual void doSplit( - std::shared_ptr& shuffleWriter, - int64_t& elapseRead, - int64_t& numBatches, - int64_t& numRows, - int64_t& splitTime, - const int numPartitions, - ShuffleWriterOptions options, - const std::string& dataFile, - const std::vector& localDirs, - benchmark::State& state) {} - - protected: - std::shared_ptr file_; - std::vector rowGroupIndices_; - std::vector columnIndices_; - std::shared_ptr schema_; - parquet::ArrowReaderProperties properties_; -}; - -class BenchmarkShuffleSplitCacheScanBenchmark : public BenchmarkShuffleSplit { - public: - BenchmarkShuffleSplitCacheScanBenchmark(std::string filename) : BenchmarkShuffleSplit(filename) {} - - protected: - void doSplit( - std::shared_ptr& shuffleWriter, - int64_t& elapseRead, - int64_t& numBatches, - int64_t& numRows, - int64_t& splitTime, - const int numPartitions, - ShuffleWriterOptions options, - const std::string& dataFile, - const std::vector& localDirs, - benchmark::State& state) { - std::vector localColumnIndices; - // local_column_indices.push_back(0); - /* local_column_indices.push_back(0); - local_column_indices.push_back(1); - local_column_indices.push_back(2); - local_column_indices.push_back(4); - local_column_indices.push_back(5); - local_column_indices.push_back(6); - local_column_indices.push_back(7); -*/ - localColumnIndices.push_back(8); - localColumnIndices.push_back(9); - localColumnIndices.push_back(13); - localColumnIndices.push_back(14); - localColumnIndices.push_back(15); - - std::shared_ptr localSchema; - arrow::FieldVector fields; - fields.push_back(schema_->field(8)); - fields.push_back(schema_->field(9)); - fields.push_back(schema_->field(13)); - fields.push_back(schema_->field(14)); - fields.push_back(schema_->field(15)); - localSchema = std::make_shared(fields); - - if (state.thread_index() == 0) - LOG(INFO) << localSchema->ToString(); - - auto partitionWriter = std::make_unique( - numPartitions, PartitionWriterOptions{}, defaultArrowMemoryPool().get(), dataFile, localDirs); - GLUTEN_ASSIGN_OR_THROW( - shuffleWriter, - VeloxHashBasedShuffleWriter::create( - numPartitions, - std::move(partitionWriter), - std::move(options), - defaultLeafVeloxMemoryPool(), - defaultArrowMemoryPool().get())); - - std::shared_ptr recordBatch; - - std::unique_ptr<::parquet::arrow::FileReader> parquetReader; - std::shared_ptr recordBatchReader; - GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make( - defaultArrowMemoryPool().get(), ::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader)); - - std::vector> batches; - GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, localColumnIndices, &recordBatchReader)); - do { - TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); - - if (recordBatch) { - batches.push_back(recordBatch); - numBatches += 1; - numRows += recordBatch->num_rows(); - } - } while (recordBatch); - LOG(INFO) << "parquet parse done elapsed time " << elapseRead / 1000000 << " ms "; - LOG(INFO) << "batches = " << numBatches << " rows = " << numRows; - - for (auto _ : state) { - for_each( - batches.cbegin(), - batches.cend(), - [&shuffleWriter, &splitTime](const std::shared_ptr& recordBatch) { - std::shared_ptr cb; - ARROW_ASSIGN_OR_THROW(cb, recordBatch2VeloxColumnarBatch(*recordBatch)); - TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb, ShuffleWriter::kMinMemLimit)); - }); - // LOG(INFO) << " split done memory allocated = " << - // options.memoryPool->bytes_allocated(); - } - - TIME_NANO_OR_THROW(splitTime, shuffleWriter->stop()); - } -}; - -class BenchmarkShuffleSplitIterateScanBenchmark : public BenchmarkShuffleSplit { - public: - BenchmarkShuffleSplitIterateScanBenchmark(std::string filename) : BenchmarkShuffleSplit(filename) {} - - protected: - void doSplit( - std::shared_ptr& shuffleWriter, - int64_t& elapseRead, - int64_t& numBatches, - int64_t& numRows, - int64_t& splitTime, - const int numPartitions, - ShuffleWriterOptions options, - const std::string& dataFile, - const std::vector& localDirs, - benchmark::State& state) { - if (state.thread_index() == 0) - LOG(INFO) << schema_->ToString(); - - auto partitionWriter = std::make_unique( - numPartitions, PartitionWriterOptions{}, defaultArrowMemoryPool().get(), dataFile, localDirs); - GLUTEN_ASSIGN_OR_THROW( - shuffleWriter, - VeloxHashBasedShuffleWriter::create( - numPartitions, - std::move(partitionWriter), - std::move(options), - defaultLeafVeloxMemoryPool(), - defaultArrowMemoryPool().get())); - - std::shared_ptr recordBatch; - - std::unique_ptr<::parquet::arrow::FileReader> parquetReader; - std::shared_ptr recordBatchReader; - GLUTEN_THROW_NOT_OK(::parquet::arrow::FileReader::Make( - defaultArrowMemoryPool().get(), ::parquet::ParquetFileReader::Open(file_), properties_, &parquetReader)); - - for (auto _ : state) { - std::vector> batches; - GLUTEN_THROW_NOT_OK(parquetReader->GetRecordBatchReader(rowGroupIndices_, columnIndices_, &recordBatchReader)); - TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); - while (recordBatch) { - numBatches += 1; - numRows += recordBatch->num_rows(); - std::shared_ptr cb; - ARROW_ASSIGN_OR_THROW(cb, recordBatch2VeloxColumnarBatch(*recordBatch)); - TIME_NANO_OR_THROW(splitTime, shuffleWriter->write(cb, ShuffleWriter::kMinMemLimit)); - TIME_NANO_OR_THROW(elapseRead, recordBatchReader->ReadNext(&recordBatch)); - } - } - TIME_NANO_OR_THROW(splitTime, shuffleWriter->stop()); - } -}; - -} // namespace gluten - -int main(int argc, char** argv) { - benchmark::Initialize(&argc, argv); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - if (FLAGS_file.size() == 0) { - LOG(WARNING) << "No input data file. Please specify via argument --file"; - } - - if (FLAGS_partitions == -1) { - FLAGS_partitions = std::thread::hardware_concurrency(); - } - - gluten::BenchmarkShuffleSplitIterateScanBenchmark iterateScanBenchmark(FLAGS_file); - - auto bm = benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", iterateScanBenchmark) - ->ReportAggregatesOnly(false) - ->MeasureProcessCPUTime() - ->Unit(benchmark::kSecond); - - if (FLAGS_threads > 0) { - bm->Threads(FLAGS_threads); - } else { - bm->ThreadRange(1, std::thread::hardware_concurrency()); - } - if (FLAGS_iterations > 0) { - bm->Iterations(FLAGS_iterations); - } - - benchmark::RunSpecifiedBenchmarks(); - benchmark::Shutdown(); -} diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc b/cpp/velox/benchmarks/common/BenchmarkUtils.cc index a9f6f0838cfa..c3baa2f33915 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc @@ -31,14 +31,18 @@ DEFINE_int32(cpu, -1, "Run benchmark on specific CPU"); DEFINE_int32(threads, 1, "The number of threads to run this benchmark"); DEFINE_int32(iterations, 1, "The number of iterations to run this benchmark"); +namespace gluten { namespace { +std::unordered_map bmConfMap = defaultConf(); +} -std::unordered_map bmConfMap = {{gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)}}; - -} // namespace +std::unordered_map defaultConf() { + return { + {gluten::kSparkBatchSize, std::to_string(FLAGS_batch_size)}, + }; +} void initVeloxBackend(std::unordered_map& conf) { - conf[gluten::kGlogSeverityLevel] = "0"; gluten::VeloxBackend::create(conf); } @@ -190,9 +194,18 @@ void BenchmarkAllocationListener::allocationChanged(int64_t diff) { velox::succinctBytes(diff), velox::succinctBytes(usedBytes_)); auto neededBytes = usedBytes_ + diff - limit_; - auto spilledBytes = iterator_->spillFixedSize(neededBytes); + int64_t spilledBytes = 0; + if (iterator_) { + spilledBytes += iterator_->spillFixedSize(neededBytes); + } + if (spilledBytes < neededBytes && shuffleWriter_) { + int64_t reclaimed = 0; + GLUTEN_THROW_NOT_OK(shuffleWriter_->reclaimFixedSize(neededBytes - spilledBytes, &reclaimed)); + spilledBytes += reclaimed; + } LOG(INFO) << fmt::format("spill finish, got {}.", velox::succinctBytes(spilledBytes)); } else { usedBytes_ += diff; } } +} // namespace gluten diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h b/cpp/velox/benchmarks/common/BenchmarkUtils.h index ff5e675f74ce..181e56807bcd 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h @@ -32,6 +32,7 @@ #include "memory/VeloxColumnarBatch.h" #include "memory/VeloxMemoryManager.h" #include "shuffle/Options.h" +#include "shuffle/ShuffleWriter.h" #include "utils/VeloxArrowUtils.h" #include "utils/exception.h" #include "velox/common/memory/Memory.h" @@ -41,6 +42,10 @@ DECLARE_int32(cpu); DECLARE_int32(threads); DECLARE_int32(iterations); +namespace gluten { + +std::unordered_map defaultConf(); + /// Initialize the Velox backend with default value. void initVeloxBackend(); @@ -111,10 +116,16 @@ class BenchmarkAllocationListener final : public gluten::AllocationListener { iterator_ = iterator; } + void setShuffleWriter(gluten::ShuffleWriter* shuffleWriter) { + shuffleWriter_ = shuffleWriter; + } + void allocationChanged(int64_t diff) override; private: uint64_t usedBytes_{0L}; uint64_t limit_{0L}; - gluten::ResultIterator* iterator_; + gluten::ResultIterator* iterator_{nullptr}; + gluten::ShuffleWriter* shuffleWriter_{nullptr}; }; +} // namespace gluten diff --git a/cpp/velox/benchmarks/common/FileReaderIterator.h b/cpp/velox/benchmarks/common/FileReaderIterator.h index 3fa94b6afba5..16db58ce4569 100644 --- a/cpp/velox/benchmarks/common/FileReaderIterator.h +++ b/cpp/velox/benchmarks/common/FileReaderIterator.h @@ -38,8 +38,6 @@ class FileReaderIterator : public ColumnarBatchIterator { virtual ~FileReaderIterator() = default; - virtual void createReader() = 0; - virtual std::shared_ptr getSchema() = 0; int64_t getCollectBatchTime() const { diff --git a/cpp/velox/benchmarks/common/ParquetReaderIterator.h b/cpp/velox/benchmarks/common/ParquetReaderIterator.h index e654dc1897b2..6d162e4b68d5 100644 --- a/cpp/velox/benchmarks/common/ParquetReaderIterator.h +++ b/cpp/velox/benchmarks/common/ParquetReaderIterator.h @@ -27,7 +27,7 @@ class ParquetReaderIterator : public FileReaderIterator { public: explicit ParquetReaderIterator(const std::string& path) : FileReaderIterator(path) {} - void createReader() override { + void createReader() { parquet::ArrowReaderProperties properties = parquet::default_arrow_reader_properties(); properties.set_batch_size(FLAGS_batch_size); GLUTEN_THROW_NOT_OK(parquet::arrow::FileReader::Make( diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index 44f04ef31ae7..738ce99a3bc7 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -35,7 +35,9 @@ #include "utils/VeloxArrowUtils.h" #ifdef ENABLE_HDFS + #include "operators/writer/VeloxParquetDatasourceHDFS.h" + #endif #ifdef ENABLE_S3 @@ -189,17 +191,15 @@ std::shared_ptr VeloxRuntime::createShuffleWriter( auto ctxPool = getLeafVeloxPool(memoryManager); auto arrowPool = memoryManager->getArrowMemoryPool(); std::shared_ptr shuffleWriter; - if (options.shuffleWriterType == kHashShuffle) { - GLUTEN_ASSIGN_OR_THROW( - shuffleWriter, - VeloxHashBasedShuffleWriter::create( - numPartitions, std::move(partitionWriter), std::move(options), ctxPool, arrowPool)); - } else if (options.shuffleWriterType == kSortShuffle) { - GLUTEN_ASSIGN_OR_THROW( - shuffleWriter, - VeloxSortBasedShuffleWriter::create( - numPartitions, std::move(partitionWriter), std::move(options), ctxPool, arrowPool)); - } + GLUTEN_ASSIGN_OR_THROW( + shuffleWriter, + VeloxShuffleWriter::create( + options.shuffleWriterType, + numPartitions, + std::move(partitionWriter), + std::move(options), + ctxPool, + arrowPool)); return shuffleWriter; } diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.cc b/cpp/velox/shuffle/VeloxShuffleWriter.cc new file mode 100644 index 000000000000..4b4f73f9463c --- /dev/null +++ b/cpp/velox/shuffle/VeloxShuffleWriter.cc @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "shuffle/VeloxShuffleWriter.h" +#include "shuffle/VeloxHashBasedShuffleWriter.h" +#include "shuffle/VeloxSortBasedShuffleWriter.h" + +namespace gluten { +arrow::Result> VeloxShuffleWriter::create( + ShuffleWriterType type, + uint32_t numPartitions, + std::unique_ptr partitionWriter, + ShuffleWriterOptions options, + std::shared_ptr veloxPool, + arrow::MemoryPool* arrowPool) { + std::shared_ptr shuffleWriter; + switch (type) { + case kHashShuffle: + return VeloxHashBasedShuffleWriter::create( + numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool); + case kSortShuffle: + return VeloxSortBasedShuffleWriter::create( + numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool); + default: + return arrow::Status::Invalid("Unsupported shuffle writer type: ", std::to_string(type)); + } +} +} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 7318867fc590..0b49789c6d89 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -37,6 +37,7 @@ #include #include "memory/VeloxMemoryManager.h" +#include "shuffle/Options.h" #include "shuffle/PartitionWriter.h" #include "shuffle/Partitioner.h" #include "shuffle/ShuffleWriter.h" @@ -48,6 +49,14 @@ namespace gluten { class VeloxShuffleWriter : public ShuffleWriter { public: + static arrow::Result> create( + ShuffleWriterType type, + uint32_t numPartitions, + std::unique_ptr partitionWriter, + ShuffleWriterOptions options, + std::shared_ptr veloxPool, + arrow::MemoryPool* arrowPool); + facebook::velox::RowVectorPtr getStrippedRowVector(const facebook::velox::RowVector& rv) { // get new row type auto& rowType = rv.type()->asRow(); diff --git a/docs/developers/MicroBenchmarks.md b/docs/developers/MicroBenchmarks.md index 7fc2a535dcf1..21f222b42690 100644 --- a/docs/developers/MicroBenchmarks.md +++ b/docs/developers/MicroBenchmarks.md @@ -250,6 +250,93 @@ cd /path/to/gluten/cpp/build/velox/benchmarks --threads 1 --noprint-result --with-shuffle ``` +Developers can leverage the `--with-shuffle` option to benchmark the shuffle-write process by creating +a simple pipeline of `table scan + shuffle write` in Gluten. This can be done by dumping the micro benchmark +inputs from a first stage. The steps are demonstrated as below: + +1. Start spark-shell or pyspark + +We need to set `spark.gluten.sql.benchmark_task.stageId` and `spark.gluten.saveDir` to dump the inputs. +Normally, the stage id should be greater than 0. You can run the command in step 2 in advance to get the +right stage id in your case. We shall set `spark.default.parallelism` to 1 and `spark.sql.files.maxPartitionBytes` +large enough to make sure there will be only 1 task in the first stage. + +``` +# Start pyspark +./bin/pyspark --master local[*] \ +--conf spark.gluten.sql.benchmark_task.stageId=1 \ +--conf spark.gluten.saveDir=/path/to/saveDir \ +--conf spark.default.parallelism=1 \ +--conf spark.sql.files.maxPartitionBytes=10g +... # omit other spark & gluten config +``` + +2. Run the table-scan command to dump the plan for the first stage + +If simulating single or round-robin partitioning, the first stage can only have the table scan operator. + +``` +>>> spark.read.format("parquet").load("file:///example.parquet").show() +``` + +If simulating hash partitioning, there will be a projection for generating the hash partitioning key. +Therefore we need to explicitly run the `repartition` to generate the `scan + project` pipeline for the first stage. +Note that using different number of shuffle partitions here doesn't change the generated pipeline. + +``` +>>> spark.read.format("parquet").load("file:///example.parquet").repartition(10, "key1", "key2").show() +``` + +Simuating range partitioning is not supported. + +3. Run the micro benchmark with dumped inputs + +General configurations for shuffle write: + +- `--with-shuffle`: Add shuffle write process at the end of the pipeline +- `--shuffle-writer`: Specify shuffle writer type. Valid options are sort and hash. Default is hash. +- `--partitioning`: Specify partitioning type. Valid options are rr, hash and single. Defualt is rr. + The partitioning type should match the command in step 2. +- `--shuffle-partitions`: Specify number of shuffle partitions. +- `--compression`: By default, the compression codec for shuffle outputs is lz4. You can switch to other compression codecs + or use hardware accelerators Valid options are: lz4, zstd, qat-gzip, qat-zstd and iaa-gzip. The compression levels are fixed (use default compression level 1). + + Note using QAT or IAA codec requires Gluten cpp is built with these features. + Please check the corresponding section in [Velox document](../get-started/Velox.md) first for how to + setup, build and enable these features in Gluten. For QAT support, please + check [Intel® QuickAssist Technology (QAT) support](../get-started/Velox.md#intel-quickassist-technology-qat-support). + For IAA support, please + check [Intel® In-memory Analytics Accelerator (IAA/IAX) support](../get-started/Velox.md#intel-in-memory-analytics-accelerator-iaaiax-support) + +```shell +cd /path/to/gluten/cpp/build/velox/benchmarks +./generic_benchmark \ +--plan /path/to/saveDir/plan_1_0.json \ +--conf /path/to/saveDir/conf_1_0.ini \ +--split /path/to/saveDir/split_1_0_0.json \ +--with-shuffle \ +--shuffle-writer sort \ +--partitioning hash \ +--threads 1 +``` + +### Run shuffle write task only + +Developers can only run shuffle write task via specifying `--run-shuffle` and `--data` options. +The parquet format input will be read from arrow-parquet reader and sent to shuffle writer. +This option is similar to the `--with-shuffle` option, but it doesn't require the plan and split files. +The round-robin partitioner is used by default. Besides, random partitioning can be used for testing purpose. +By specifying option `--partitioning random`, the partitioner will generate a random partition id for each row. + +```shell +cd /path/to/gluten/cpp/build/velox/benchmarks +./generic_benchmark \ +--run-shuffle \ +--data /path/to/input_for_shuffle_write.parquet +--shuffle-writer sort \ +--threads 1 +``` + ## Simulate write tasks The last operator for a write task is a file write operator, and the output from Velox pipeline only @@ -265,20 +352,6 @@ cd /path/to/gluten/cpp/build/velox/benchmarks --write-path /absolute_path/ ``` -By default, the compression codec for shuffle outputs is LZ4. You can switch to other codecs by -adding one of the following argument flags to the command: - -- --zstd: ZSTD codec, compression level 1 -- --qat-gzip: QAT GZIP codec, compression level 1 -- --qat-zstd: QAT ZSTD codec, compression level 1 -- --iaa-gzip: IAA GZIP codec, compression level 1 - -Note using QAT or IAA codec requires Gluten cpp is built with these features. -Please check the corresponding section in [Velox document](../get-started/Velox.md) first for how to -setup, build and enable these features in Gluten. For QAT support, please -check [Intel® QuickAssist Technology (QAT) support](../get-started/Velox.md#intel-quickassist-technology-qat-support). -For IAA support, please -check [Intel® In-memory Analytics Accelerator (IAA/IAX) support](../get-started/Velox.md#intel-in-memory-analytics-accelerator-iaaiax-support) ## Simulate task spilling