Skip to content

Commit

Permalink
modify option to use unlimited memory
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Jun 20, 2024
1 parent 27bf39d commit a3ac20b
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace gluten {

static constexpr int16_t kDefaultBatchSize = 4096;
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultSortBufferThreshold = std::numeric_limits<int64_t>::max();
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
static constexpr int32_t kDefaultNumSubDirs = 64;
static constexpr int32_t kDefaultCompressionThreshold = 100;
Expand All @@ -36,7 +36,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25;
static constexpr double kDefaultMergeBufferThreshold = 0.25;
static constexpr bool kEnableBufferedWrite = true;

enum ShuffleWriterType { kHashShuffle, kSortShuffle };
enum ShuffleWriterType { kHashShuffle, kSortShuffle, kSortShuffleV2 };
enum PartitionWriterType { kLocal, kRss };

struct ShuffleReaderOptions {
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ std::shared_ptr<VeloxShuffleWriter> createShuffleWriter(
options.partitioning = gluten::toPartitioning(FLAGS_partitioning);
if (FLAGS_shuffle_writer == "sort") {
options.shuffleWriterType = gluten::kSortShuffle;
} else if (FLAGS_shuffle_writer == "sortv2") {
options.shuffleWriterType = gluten::kSortShuffleV2;
}
auto shuffleWriter = runtime->createShuffleWriter(
FLAGS_shuffle_partitions, std::move(partitionWriter), std::move(options), memoryManager);
Expand Down Expand Up @@ -194,7 +196,7 @@ void runShuffle(
{
gluten::ScopedTimer timer(&totalTime);
while (resultIter->hasNext()) {
GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), ShuffleWriter::kMinMemLimit));
GLUTEN_THROW_NOT_OK(shuffleWriter->write(resultIter->next(), std::numeric_limits<int64_t>::max()));
}
GLUTEN_THROW_NOT_OK(shuffleWriter->stop());
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,10 @@ std::unique_ptr<ColumnarBatchIterator> VeloxColumnarBatchDeserializerFactory::cr
hasComplexType_,
deserializeTime_,
decompressTime_);
} else if (shuffleWriterType_ = kSortShuffle) {
return std::make_unique<VeloxShuffleReaderOutStreamWrapper>(
veloxPool_, rowType_, batchSize_, veloxCompressionType_, deserializeTime_, std::move(in));
}
// return std::make_unique<VeloxShuffleReaderOutStreamWrapper>(
// veloxPool_, rowType_, batchSize_, veloxCompressionType_, deserializeTime_, std::move(in));
return std::make_unique<VeloxRowVectorDeserializer>(
std::move(in), schema_, rowType_, batchSize_, memoryPool_, veloxPool_, veloxCompressionType_, deserializeTime_);
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ arrow::Result<std::shared_ptr<VeloxShuffleWriter>> VeloxShuffleWriter::create(
return VeloxHashBasedShuffleWriter::create(
numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool);
case kSortShuffle:
return VeloxSortBasedShuffleWriter::create(
numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool);
case kSortShuffleV2:
return VeloxSortShuffleWriter::create(
numPartitions, std::move(partitionWriter), std::move(options), veloxPool, arrowPool);
default:
Expand Down

0 comments on commit a3ac20b

Please sign in to comment.