Skip to content

Commit

Permalink
[GLUTEN-3890][CH] Respect spill_threshold for all buffers in shuffle …
Browse files Browse the repository at this point in the history
…writer (#3891)

What changes were proposed in this pull request?
Fixes: #3890 and improve some code styles
  • Loading branch information
taiyang-li authored Dec 7, 2023
1 parent b2507b5 commit a935707
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public long make(
boolean preferSpill,
long spillThreshold,
String hashAlgorithm,
boolean throwIfMemoryExceed) {
boolean throwIfMemoryExceed,
boolean flushBlockBufferBeforeEvict) {
return nativeMake(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -49,7 +50,8 @@ public long make(
preferSpill,
spillThreshold,
hashAlgorithm,
throwIfMemoryExceed);
throwIfMemoryExceed,
flushBlockBufferBeforeEvict);
}

public long makeForRSS(
Expand All @@ -61,7 +63,8 @@ public long makeForRSS(
long spillThreshold,
String hashAlgorithm,
Object pusher,
boolean throwIfMemoryExceed) {
boolean throwIfMemoryExceed,
boolean flushBlockBufferBeforeEvict) {
return nativeMakeForRSS(
part.getShortName(),
part.getNumPartitions(),
Expand All @@ -74,7 +77,8 @@ public long makeForRSS(
spillThreshold,
hashAlgorithm,
pusher,
throwIfMemoryExceed);
throwIfMemoryExceed,
flushBlockBufferBeforeEvict);
}

public native long nativeMake(
Expand All @@ -92,7 +96,8 @@ public native long nativeMake(
boolean preferSpill,
long spillThreshold,
String hashAlgorithm,
boolean throwIfMemoryExceed);
boolean throwIfMemoryExceed,
boolean flushBlockBufferBeforeEvict);

public native long nativeMakeForRSS(
String shortName,
Expand All @@ -106,7 +111,8 @@ public native long nativeMakeForRSS(
long spillThreshold,
String hashAlgorithm,
Object pusher,
boolean throwIfMemoryExceed);
boolean throwIfMemoryExceed,
boolean flushBlockBufferBeforeEvict);

public native void split(long splitterId, long block);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class CHColumnarShuffleWriter[K, V](
GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT)
private val preferSpill = GlutenConfig.getConf.chColumnarShufflePreferSpill
private val throwIfMemoryExceed = GlutenConfig.getConf.chColumnarThrowIfMemoryExceed
private val flushBlockBufferBeforeEvict =
GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict
private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold
private val jniWrapper = new CHShuffleSplitterJniWrapper
// Are we in the process of stopping? Because map tasks can call stop() with success = true
Expand Down Expand Up @@ -105,7 +107,8 @@ class CHColumnarShuffleWriter[K, V](
preferSpill,
spillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
throwIfMemoryExceed
throwIfMemoryExceed,
flushBlockBufferBeforeEvict
)
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
Expand Down
14 changes: 6 additions & 8 deletions cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitO
partition_writer = std::make_unique<LocalPartitionWriter>(this);
}

split_result.partition_length.resize(options.partition_num, 0);
split_result.raw_partition_length.resize(options.partition_num, 0);
split_result.partition_lengths.resize(options.partition_num, 0);
split_result.raw_partition_lengths.resize(options.partition_num, 0);
}


Expand All @@ -113,11 +113,6 @@ void CachedShuffleWriter::split(DB::Block & block)
out_block.insert(block.getByPosition(output_columns_indicies[col_i]));
}
partition_writer->write(partition_info, out_block);

if (options.spill_threshold > 0 && partition_writer->totalCacheSize() > options.spill_threshold)
{
partition_writer->evictPartitions(false);
}
}

void CachedShuffleWriter::initOutputIfNeeded(Block & block)
Expand Down Expand Up @@ -146,12 +141,15 @@ void CachedShuffleWriter::initOutputIfNeeded(Block & block)
SplitResult CachedShuffleWriter::stop()
{
partition_writer->stop();

static auto * logger = &Poco::Logger::get("CachedShuffleWriter");
LOG_INFO(logger, "CachedShuffleWriter stop, split result: {}", split_result.toString());
return split_result;
}

size_t CachedShuffleWriter::evictPartitions()
{
return partition_writer->evictPartitions(true);
return partition_writer->evictPartitions(true, options.flush_block_buffer_before_evict);
}

}
Loading

0 comments on commit a935707

Please sign in to comment.