Skip to content

Commit

Permalink
update doc and remove shuffle split benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed May 31, 2024
1 parent e900502 commit c5a1c21
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 432 deletions.
1 change: 0 additions & 1 deletion cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "shuffle/Payload.h"
#include "shuffle/Spill.h"
#include "shuffle/Utils.h"
#include "utils/Timer.h"

namespace gluten {

Expand Down
2 changes: 2 additions & 0 deletions cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ arrow::Status UncompressedDiskBlockPayload::serialize(arrow::io::OutputStream* o
}

arrow::Result<std::shared_ptr<arrow::Buffer>> UncompressedDiskBlockPayload::readUncompressedBuffer() {
ScopedTimer timer(&writeTime_);
readPos_++;
int64_t bufferLength;
RETURN_NOT_OK(inputStream_->Read(sizeof(int64_t), &bufferLength));
Expand All @@ -523,6 +524,7 @@ CompressedDiskBlockPayload::CompressedDiskBlockPayload(
: Payload(Type::kCompressed, numRows, isValidityBuffer), inputStream_(inputStream), rawSize_(rawSize) {}

arrow::Status CompressedDiskBlockPayload::serialize(arrow::io::OutputStream* outputStream) {
ScopedTimer timer(&writeTime_);
ARROW_ASSIGN_OR_RAISE(auto block, inputStream_->Read(rawSize_));
RETURN_NOT_OK(outputStream->Write(block));
return arrow::Status::OK();
Expand Down
2 changes: 0 additions & 2 deletions cpp/velox/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc)

add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc)

add_velox_benchmark(shuffle_split_benchmark ShuffleSplitBenchmark.cc)

if(ENABLE_ORC)
add_velox_benchmark(orc_converter exec/OrcConverter.cc)
endif()
55 changes: 34 additions & 21 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
#include "compute/VeloxRuntime.h"
#include "config/GlutenConfig.h"
#include "shuffle/LocalPartitionWriter.h"
#include "shuffle/VeloxHashBasedShuffleWriter.h"
#include "shuffle/VeloxShuffleWriter.h"
#include "shuffle/rss/RssPartitionWriter.h"
#include "utils/StringUtil.h"
#include "utils/Timer.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/exception.h"
#include "utils/tests/LocalRssClient.h"
Expand All @@ -50,10 +50,10 @@ DEFINE_bool(with_shuffle, false, "Add shuffle split at end.");
DEFINE_string(partitioning, "rr", "Short partitioning name. Valid options are rr, hash, range, single");
DEFINE_string(shuffle_writer, "hash", "Shuffle writer type. Can be hash or sort");
DEFINE_bool(rss, false, "Mocking rss.");
DEFINE_bool(zstd, false, "Use ZSTD as shuffle compression codec");
DEFINE_bool(qat_gzip, false, "Use QAT GZIP as shuffle compression codec");
DEFINE_bool(qat_zstd, false, "Use QAT ZSTD as shuffle compression codec");
DEFINE_bool(iaa_gzip, false, "Use IAA GZIP as shuffle compression codec");
DEFINE_string(
compression,
"lz4",
"Specify the compression codec. Valid options are lz4, zstd, qat_gzip, qat_zstd, iaa_gzip");
DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions");
DEFINE_bool(run_example, false, "Run the example and exit.");

Expand All @@ -72,6 +72,9 @@ struct WriterMetrics {
int64_t evictTime;
int64_t writeTime;
int64_t compressTime;

public:
explicit WriterMetrics() : splitTime(0), evictTime(0), writeTime(0), compressTime(0) {}
};

std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
Expand All @@ -80,16 +83,21 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
const std::string& dataFile,
const std::vector<std::string>& localDirs) {
PartitionWriterOptions partitionWriterOptions{};
if (FLAGS_zstd) {

// Configure compression.
if (FLAGS_compression == "lz4") {
partitionWriterOptions.codecBackend = CodecBackend::NONE;
partitionWriterOptions.compressionType = arrow::Compression::LZ4_FRAME;
} else if (FLAGS_compression == "zstd") {
partitionWriterOptions.codecBackend = CodecBackend::NONE;
partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
} else if (FLAGS_qat_gzip) {
} else if (FLAGS_compression == "qat_gzip") {
partitionWriterOptions.codecBackend = CodecBackend::QAT;
partitionWriterOptions.compressionType = arrow::Compression::GZIP;
} else if (FLAGS_qat_zstd) {
} else if (FLAGS_compression == "qat_zstd") {
partitionWriterOptions.codecBackend = CodecBackend::QAT;
partitionWriterOptions.compressionType = arrow::Compression::ZSTD;
} else if (FLAGS_iaa_gzip) {
} else if (FLAGS_compression == "iaa_gzip") {
partitionWriterOptions.codecBackend = CodecBackend::IAA;
partitionWriterOptions.compressionType = arrow::Compression::GZIP;
}
Expand Down Expand Up @@ -124,14 +132,15 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(

void populateWriterMetrics(
const std::shared_ptr<VeloxShuffleWriter>& shuffleWriter,
int64_t shuffleWriteTime,
int64_t totalTime,
WriterMetrics& metrics) {
metrics.compressTime += shuffleWriter->totalCompressTime();
metrics.evictTime += shuffleWriter->totalEvictTime();
metrics.writeTime += shuffleWriter->totalWriteTime();
metrics.evictTime +=
(shuffleWriteTime - shuffleWriter->totalCompressTime() - shuffleWriter->totalEvictTime() -
shuffleWriter->totalWriteTime());
auto splitTime = totalTime - metrics.compressTime - metrics.evictTime - metrics.writeTime;
if (splitTime > 0) {
metrics.splitTime += splitTime;
}
}

} // namespace
Expand Down Expand Up @@ -191,19 +200,23 @@ auto BM_Generic = [](::benchmark::State& state,
}
auto veloxPlan = dynamic_cast<gluten::VeloxRuntime*>(runtime)->getVeloxPlan();
if (FLAGS_with_shuffle) {
int64_t shuffleWriteTime;
TIME_NANO_START(shuffleWriteTime);
std::string dataFile;
std::vector<std::string> localDirs;
bool isFromEnv;
GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(dataFile, localDirs, isFromEnv));
const auto& shuffleWriter = createShuffleWriter(runtime, memoryManager.get(), dataFile, localDirs);
while (resultIter->hasNext()) {
GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), ShuffleWriter::kMinMemLimit));

auto shuffleWriter = createShuffleWriter(runtime, memoryManager.get(), dataFile, localDirs);

int64_t totalTime = 0;
{
gluten::ScopedTimer timer(&totalTime);
while (resultIter->hasNext()) {
GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), ShuffleWriter::kMinMemLimit));
}
GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
}
GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
TIME_NANO_END(shuffleWriteTime);
populateWriterMetrics(shuffleWriter, shuffleWriteTime, writerMetrics);

populateWriterMetrics(shuffleWriter, totalTime, writerMetrics);
// Cleanup shuffle outputs
cleanupShuffleOutput(dataFile, localDirs, isFromEnv);
} else {
Expand Down
Loading

0 comments on commit c5a1c21

Please sign in to comment.