diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java index 295159dceefc..af4ab1ecfb49 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java @@ -33,7 +33,8 @@ public long make( boolean preferSpill, long spillThreshold, String hashAlgorithm, - boolean throwIfMemoryExceed) { + boolean throwIfMemoryExceed, + boolean flushBlockBufferBeforeEvict) { return nativeMake( part.getShortName(), part.getNumPartitions(), @@ -49,7 +50,8 @@ public long make( preferSpill, spillThreshold, hashAlgorithm, - throwIfMemoryExceed); + throwIfMemoryExceed, + flushBlockBufferBeforeEvict); } public long makeForRSS( @@ -61,7 +63,8 @@ public long makeForRSS( long spillThreshold, String hashAlgorithm, Object pusher, - boolean throwIfMemoryExceed) { + boolean throwIfMemoryExceed, + boolean flushBlockBufferBeforeEvict) { return nativeMakeForRSS( part.getShortName(), part.getNumPartitions(), @@ -74,7 +77,8 @@ public long makeForRSS( spillThreshold, hashAlgorithm, pusher, - throwIfMemoryExceed); + throwIfMemoryExceed, + flushBlockBufferBeforeEvict); } public native long nativeMake( @@ -92,7 +96,8 @@ public native long nativeMake( boolean preferSpill, long spillThreshold, String hashAlgorithm, - boolean throwIfMemoryExceed); + boolean throwIfMemoryExceed, + boolean flushBlockBufferBeforeEvict); public native long nativeMakeForRSS( String shortName, @@ -106,7 +111,8 @@ public native long nativeMakeForRSS( long spillThreshold, String hashAlgorithm, Object pusher, - boolean throwIfMemoryExceed); + boolean throwIfMemoryExceed, + boolean flushBlockBufferBeforeEvict); public native void split(long splitterId, long block); diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index b799c7eb820d..a434a72cfa8b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -56,6 +56,8 @@ class CHColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT) private val preferSpill = GlutenConfig.getConf.chColumnarShufflePreferSpill private val throwIfMemoryExceed = GlutenConfig.getConf.chColumnarThrowIfMemoryExceed + private val flushBlockBufferBeforeEvict = + GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold private val jniWrapper = new CHShuffleSplitterJniWrapper // Are we in the process of stopping? Because map tasks can call stop() with success = true @@ -105,7 +107,8 @@ class CHColumnarShuffleWriter[K, V]( preferSpill, spillThreshold, CHBackendSettings.shuffleHashAlgorithm, - throwIfMemoryExceed + throwIfMemoryExceed, + flushBlockBufferBeforeEvict ) CHNativeMemoryAllocators.createSpillable( "ShuffleWriter", diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 41a096ab5901..4a011625874e 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -90,8 +90,8 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitO partition_writer = std::make_unique(this); } - split_result.partition_length.resize(options.partition_num, 0); - split_result.raw_partition_length.resize(options.partition_num, 0); + split_result.partition_lengths.resize(options.partition_num, 0); + split_result.raw_partition_lengths.resize(options.partition_num, 0); } @@ -113,11 +113,6 @@ void CachedShuffleWriter::split(DB::Block & block) out_block.insert(block.getByPosition(output_columns_indicies[col_i])); } partition_writer->write(partition_info, out_block); - - if (options.spill_threshold > 0 && partition_writer->totalCacheSize() > options.spill_threshold) - { - partition_writer->evictPartitions(false); - } } void CachedShuffleWriter::initOutputIfNeeded(Block & block) @@ -146,12 +141,15 @@ void CachedShuffleWriter::initOutputIfNeeded(Block & block) SplitResult CachedShuffleWriter::stop() { partition_writer->stop(); + + static auto * logger = &Poco::Logger::get("CachedShuffleWriter"); + LOG_INFO(logger, "CachedShuffleWriter stop, split result: {}", split_result.toString()); return split_result; } size_t CachedShuffleWriter::evictPartitions() { - return partition_writer->evictPartitions(true); + return partition_writer->evictPartitions(true, options.flush_block_buffer_before_evict); } } diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 0195ecf3870b..8e5915e0d799 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -27,53 +27,109 @@ #include #include #include +#include #include #include #include -using namespace DB; +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +using namespace DB; namespace local_engine { -void local_engine::PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & data) +void PartitionWriter::write(const PartitionInfo & partition_info, DB::Block & block) { + /// PartitionWriter::write is alwasy the top frame who occupies evicting_or_writing if (evicting_or_writing) - return; + throw Exception(ErrorCodes::LOGICAL_ERROR, "PartitionWriter::write is invoked with evicting_or_writing being occupied"); evicting_or_writing = true; SCOPE_EXIT({evicting_or_writing = false;}); - Stopwatch time; - - for (size_t partition_i = 0; partition_i < partition_info.partition_num; ++partition_i) + Stopwatch watch; + size_t current_cached_bytes = bytes(); + for (size_t partition_id = 0; partition_id < partition_info.partition_num; ++partition_id) { - size_t from = partition_info.partition_start_points[partition_i]; - size_t length = partition_info.partition_start_points[partition_i + 1] - from; + size_t from = partition_info.partition_start_points[partition_id]; + size_t length = partition_info.partition_start_points[partition_id + 1] - from; /// Make sure buffer size is no greater than split_size - auto & buffer = partition_block_buffer[partition_i]; - if (buffer->size() && buffer->size() + length >= shuffle_writer->options.split_size) + auto & block_buffer = partition_block_buffer[partition_id]; + auto & buffer = partition_buffer[partition_id]; + if (block_buffer->size() && block_buffer->size() + length >= shuffle_writer->options.split_size) + buffer->addBlock(block_buffer->releaseColumns()); + + current_cached_bytes -= block_buffer->bytes(); + for (size_t col_i = 0; col_i < block.columns(); ++col_i) + block_buffer->appendSelective(col_i, block, partition_info.partition_selector, from, length); + current_cached_bytes += block_buffer->bytes(); + + /// Only works for celeborn partitiion writer + if (supportsEvictSinglePartition() && options->spill_threshold > 0 && current_cached_bytes >= options->spill_threshold) { - Block block = buffer->releaseColumns(); - auto bytes = block.bytes(); - total_partition_buffer_size += bytes; - shuffle_writer->split_result.raw_partition_length[partition_i] += bytes; - partition_buffer[partition_i]->addBlock(std::move(block)); + /// If flush_block_buffer_before_evict is disabled, evict partitions from (last_partition_id+1)%partition_num to partition_id directly without flush, + /// Otherwise flush partition block buffer if it's size is no less than average rows, then evict partitions as above. + if (!options->flush_block_buffer_before_evict) + { + for (size_t i = (last_partition_id + 1) % options->partition_num; i != (partition_id + 1) % options->partition_num; + i = (i + 1) % options->partition_num) + unsafeEvictSinglePartition(false, false, i); + } + else + { + /// Calculate average rows of each partition block buffer + size_t avg_size = 0; + size_t cnt = 0; + for (size_t i = (last_partition_id + 1) % options->partition_num; i != (partition_id + 1) % options->partition_num; + i = (i + 1) % options->partition_num) + { + avg_size += partition_block_buffer[i]->size(); + ++cnt; + } + avg_size /= cnt; + + + for (size_t i = (last_partition_id + 1) % options->partition_num; i != (partition_id + 1) % options->partition_num; + i = (i + 1) % options->partition_num) + { + bool flush_block_buffer = partition_block_buffer[i]->size() >= avg_size; + current_cached_bytes -= flush_block_buffer ? partition_block_buffer[i]->bytes() + partition_buffer[i]->bytes() + : partition_buffer[i]->bytes(); + unsafeEvictSinglePartition(false, flush_block_buffer, i); + } + // std::cout << "current cached bytes after evict partitions is " << current_cached_bytes << " partition from " + // << (last_partition_id + 1) % options->partition_num << " to " << partition_id << " average size:" << avg_size + // << std::endl; + } + + last_partition_id = partition_id; } + } - for (size_t col_i = 0; col_i < data.columns(); ++col_i) - buffer->appendSelective(col_i, data, partition_info.partition_selector, from, length); + /// Only works for local partition writer + if (!supportsEvictSinglePartition() && options->spill_threshold && current_cached_bytes >= options->spill_threshold) + { + unsafeEvictPartitions(false, options->flush_block_buffer_before_evict); } - shuffle_writer->split_result.total_split_time += time.elapsedNanoseconds(); + shuffle_writer->split_result.total_split_time += watch.elapsedNanoseconds(); } -size_t LocalPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) +size_t LocalPartitionWriter::unsafeEvictPartitions(bool for_memory_spill, bool flush_block_buffer) { size_t res = 0; + size_t spilled_bytes = 0; - auto spill_to_file = [this, &res]() -> void { + auto spill_to_file = [this, for_memory_spill, flush_block_buffer, &res, &spilled_bytes]() + { auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); @@ -86,16 +142,28 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) Stopwatch serialization_time_watch; for (size_t partition_id = 0; partition_id < partition_buffer.size(); ++partition_id) { + auto & buffer = partition_buffer[partition_id]; + + if (flush_block_buffer) + { + auto & block_buffer = partition_block_buffer[partition_id]; + if (!block_buffer->empty()) + buffer->addBlock(block_buffer->releaseColumns()); + } + + if (buffer->empty()) + continue; + PartitionSpillInfo partition_spill_info; partition_spill_info.start = output.count(); + spilled_bytes += buffer->bytes(); - auto & partition = partition_buffer[partition_id]; - size_t raw_size = partition->spill(writer); - res += raw_size; + size_t written_bytes = buffer->spill(writer); + res += written_bytes; compressed_output.sync(); partition_spill_info.length = output.count() - partition_spill_info.start; - shuffle_writer->split_result.raw_partition_length[partition_id] += raw_size; + shuffle_writer->split_result.raw_partition_lengths[partition_id] += written_bytes; partition_spill_info.partition_id = partition_id; info.partition_spill_infos.emplace_back(partition_spill_info); } @@ -119,20 +187,18 @@ size_t LocalPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) spill_to_file(); } shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; - total_partition_buffer_size = 0; - + shuffle_writer->split_result.total_bytes_spilled += spilled_bytes; return res; } -std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) +std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) { auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); - std::vector partition_length; - partition_length.resize(shuffle_writer->options.partition_num, 0); + std::vector partition_length(shuffle_writer->options.partition_num, 0); + std::vector spill_inputs; spill_inputs.reserve(spill_infos.size()); for (const auto & spill : spill_infos) @@ -172,14 +238,14 @@ std::vector LocalPartitionWriter::mergeSpills(WriteBuffer& data_file) partition_length[partition_id] = data_file.count() - size_before; shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_bytes_written += partition_length[partition_id]; - shuffle_writer->split_result.raw_partition_length[partition_id] += raw_size; + shuffle_writer->split_result.raw_partition_lengths[partition_id] += raw_size; } shuffle_writer->split_result.total_write_time += write_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); - shuffle_writer->split_result.total_disk_time += compressed_output.getWriteTime(); - shuffle_writer->split_result.total_serialize_time = shuffle_writer->split_result.total_serialize_time - shuffle_writer->split_result.total_disk_time - shuffle_writer->split_result.total_compress_time; - shuffle_writer->split_result.total_disk_time += merge_io_time; + shuffle_writer->split_result.total_io_time += compressed_output.getWriteTime(); + shuffle_writer->split_result.total_serialize_time = shuffle_writer->split_result.total_serialize_time - shuffle_writer->split_result.total_io_time - shuffle_writer->split_result.total_compress_time; + shuffle_writer->split_result.total_io_time += merge_io_time; for (const auto & spill : spill_infos) { @@ -211,7 +277,7 @@ void LocalPartitionWriter::unsafeStop() { WriteBufferFromFile output(options->data_file, options->io_buffer_size); auto offsets = mergeSpills(output); - shuffle_writer->split_result.partition_length = offsets; + shuffle_writer->split_result.partition_lengths = offsets; } PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) @@ -219,68 +285,108 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter * shuffle_writer_) , options(&shuffle_writer->options) , partition_block_buffer(options->partition_num) , partition_buffer(options->partition_num) + , last_partition_id(options->partition_num - 1) { - for (size_t partition_i = 0; partition_i < options->partition_num; ++partition_i) + for (size_t partition_id = 0; partition_id < options->partition_num; ++partition_id) { - partition_block_buffer[partition_i] = std::make_shared(options->split_size); - partition_buffer[partition_i] = std::make_shared(); + partition_block_buffer[partition_id] = std::make_shared(options->split_size); + partition_buffer[partition_id] = std::make_shared(); } } -size_t PartitionWriter::evictPartitions(bool for_memory_spill) +size_t PartitionWriter::evictPartitions(bool for_memory_spill, bool flush_block_buffer) { if (evicting_or_writing) return 0; evicting_or_writing = true; SCOPE_EXIT({evicting_or_writing = false;}); - return unsafeEvictPartitions(for_memory_spill); + return unsafeEvictPartitions(for_memory_spill, flush_block_buffer); } void PartitionWriter::stop() { if (evicting_or_writing) - return; + throw Exception(ErrorCodes::LOGICAL_ERROR, "PartitionWriter::stop is invoked with evicting_or_writing being occupied"); evicting_or_writing = true; SCOPE_EXIT({evicting_or_writing = false;}); return unsafeStop(); } +size_t PartitionWriter::bytes() const +{ + size_t bytes = 0; + + for (const auto & buffer : partition_block_buffer) + bytes += buffer->bytes(); + + for (const auto & buffer : partition_buffer) + bytes += buffer->bytes(); + + return bytes; +} + CelebornPartitionWriter::CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client_) : PartitionWriter(shuffleWriter), celeborn_client(std::move(celeborn_client_)) { } -size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) +size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill, bool flush_block_buffer) { size_t res = 0; + for (size_t partition_id = 0; partition_id < options->partition_num; ++partition_id) + { + res += unsafeEvictSinglePartition(for_memory_spill, flush_block_buffer, partition_id); + } + return res; +} - auto spill_to_celeborn = [this, for_memory_spill, &res]() +size_t CelebornPartitionWriter::unsafeEvictSinglePartition(bool for_memory_spill, bool flush_block_buffer, size_t partition_id) +{ + size_t res = 0; + size_t spilled_bytes = 0; + auto spill_to_celeborn = [this, for_memory_spill, flush_block_buffer, partition_id, &res, &spilled_bytes]() { Stopwatch serialization_time_watch; - for (size_t partition_id = 0; partition_id < partition_buffer.size(); ++partition_id) + auto & buffer = partition_buffer[partition_id]; + + if (flush_block_buffer) { - WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); - CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); - NativeWriter writer(compressed_output, shuffle_writer->output_header); - - auto & partition = partition_buffer[partition_id]; - size_t raw_size = partition->spill(writer); - res += raw_size; - compressed_output.sync(); + auto & block_buffer = partition_block_buffer[partition_id]; + if (!block_buffer->empty()) + { + // std::cout << "flush block buffer for partition:" << partition_id << " rows:" << block_buffer->size() << std::endl; + buffer->addBlock(block_buffer->releaseColumns()); + } + } - Stopwatch push_time_watch; - celeborn_client->pushPartitionData(partition_id, output.str().data(), output.str().size()); + /// Skip empty buffer + if (buffer->empty()) + return; - shuffle_writer->split_result.partition_length[partition_id] += output.str().size(); - shuffle_writer->split_result.raw_partition_length[partition_id] += raw_size; - shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); - shuffle_writer->split_result.total_write_time += compressed_output.getWriteTime(); - shuffle_writer->split_result.total_write_time += push_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_disk_time += push_time_watch.elapsedNanoseconds(); - } + WriteBufferFromOwnString output; + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); + NativeWriter writer(compressed_output, shuffle_writer->output_header); + + spilled_bytes += buffer->bytes(); + size_t written_bytes = buffer->spill(writer); + res += written_bytes; + compressed_output.sync(); + + // std::cout << "evict partition " << partition_id << " uncompress_bytes:" << compressed_output.getUncompressedBytes() + // << " compress_bytes:" << compressed_output.getCompressedBytes() << std::endl; + + Stopwatch push_time_watch; + celeborn_client->pushPartitionData(partition_id, output.str().data(), output.str().size()); + + shuffle_writer->split_result.partition_lengths[partition_id] += output.str().size(); + shuffle_writer->split_result.raw_partition_lengths[partition_id] += written_bytes; + shuffle_writer->split_result.total_compress_time += compressed_output.getCompressTime(); + shuffle_writer->split_result.total_write_time += compressed_output.getWriteTime(); + shuffle_writer->split_result.total_write_time += push_time_watch.elapsedNanoseconds(); + shuffle_writer->split_result.total_io_time += push_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); }; @@ -298,26 +404,15 @@ size_t CelebornPartitionWriter::unsafeEvictPartitions(bool for_memory_spill) } shuffle_writer->split_result.total_spill_time += spill_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_spilled += total_partition_buffer_size; - total_partition_buffer_size = 0; + shuffle_writer->split_result.total_bytes_spilled += spilled_bytes; return res; } void CelebornPartitionWriter::unsafeStop() { - /// Push the remaining data to Celeborn - for (size_t partition_id = 0; partition_id < partition_block_buffer.size(); ++partition_id) - { - if (!partition_block_buffer[partition_id]->empty()) - { - Block block = partition_block_buffer[partition_id]->releaseColumns(); - partition_buffer[partition_id]->addBlock(std::move(block)); - } - } - - unsafeEvictPartitions(false); + unsafeEvictPartitions(false, true); - for (const auto & length : shuffle_writer->split_result.partition_length) + for (const auto & length : shuffle_writer->split_result.partition_lengths) { shuffle_writer->split_result.total_bytes_written += length; } @@ -329,22 +424,24 @@ void Partition::addBlock(DB::Block block) if (!block.rows()) return; + cached_bytes += block.bytes(); blocks.emplace_back(std::move(block)); } size_t Partition::spill(NativeWriter & writer) { - size_t total_size = 0; + size_t written_bytes = 0; for (auto & block : blocks) { - total_size += writer.write(block); + written_bytes += writer.write(block); /// Clear each block once it is serialized to reduce peak memory DB::Block().swap(block); } blocks.clear(); - return total_size; + cached_bytes = 0; + return written_bytes; } } diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h b/cpp-ch/local-engine/Shuffle/PartitionWriter.h index 12ea581be6bd..e1e0429a703f 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h @@ -38,7 +38,6 @@ struct SpillInfo std::vector partition_spill_infos; }; - class Partition { public: @@ -47,11 +46,14 @@ class Partition Partition(Partition && other) noexcept : blocks(std::move(other.blocks)) { } + bool empty() const { return blocks.empty(); } void addBlock(DB::Block block); size_t spill(NativeWriter & writer); + size_t bytes() const { return cached_bytes; } private: std::vector blocks; + size_t cached_bytes = 0; }; class CachedShuffleWriter; @@ -62,25 +64,37 @@ class PartitionWriter : boost::noncopyable explicit PartitionWriter(CachedShuffleWriter* shuffle_writer_); virtual ~PartitionWriter() = default; - void write(const PartitionInfo& info, DB::Block & data); - size_t evictPartitions(bool for_memory_spill = false); - void stop(); + virtual String getName() const = 0; - size_t totalCacheSize() const { return total_partition_buffer_size; } + void write(const PartitionInfo& info, DB::Block & block); + size_t evictPartitions(bool for_memory_spill = false, bool flush_block_buffer = false); + void stop(); protected: - virtual size_t unsafeEvictPartitions(bool for_memory_spill = false) = 0; + size_t bytes() const; + + virtual size_t unsafeEvictPartitions(bool for_memory_spill, bool flush_block_buffer = false) = 0; + + virtual bool supportsEvictSinglePartition() const { return false; } + + virtual size_t unsafeEvictSinglePartition(bool for_memory_spill, bool flush_block_buffer, size_t partition_id) + { + throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Evict single partition is not supported for {}", getName()); + } + virtual void unsafeStop() = 0; CachedShuffleWriter * shuffle_writer; - SplitOptions * options; + const SplitOptions * options; std::vector partition_block_buffer; std::vector partition_buffer; - size_t total_partition_buffer_size = 0; - + /// Make sure memory spill doesn't happen while write/stop are executed. bool evicting_or_writing{false}; + + /// Only valid in celeborn partition writer + size_t last_partition_id; }; class LocalPartitionWriter : public PartitionWriter @@ -89,12 +103,14 @@ class LocalPartitionWriter : public PartitionWriter explicit LocalPartitionWriter(CachedShuffleWriter * shuffle_writer); ~LocalPartitionWriter() override = default; + String getName() const override { return "LocalPartitionWriter"; } + protected: - size_t unsafeEvictPartitions(bool for_memory_spill) override; + size_t unsafeEvictPartitions(bool for_memory_spill, bool flush_block_buffer) override; void unsafeStop() override; String getNextSpillFile(); - std::vector mergeSpills(DB::WriteBuffer& data_file); + std::vector mergeSpills(DB::WriteBuffer & data_file); std::vector spill_infos; }; @@ -105,8 +121,14 @@ class CelebornPartitionWriter : public PartitionWriter CelebornPartitionWriter(CachedShuffleWriter * shuffleWriter, std::unique_ptr celeborn_client); ~CelebornPartitionWriter() override = default; + String getName() const override { return "CelebornPartitionWriter"; } + protected: - size_t unsafeEvictPartitions(bool for_memory_spill) override; + size_t unsafeEvictPartitions(bool for_memory_spill, bool flush_block_buffer) override; + + bool supportsEvictSinglePartition() const override { return true; } + size_t unsafeEvictSinglePartition(bool for_memory_spill, bool flush_block_buffer, size_t partition_id) override; + void unsafeStop() override; std::unique_ptr celeborn_client; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp index b6aabf59344b..dc5d369542e1 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp @@ -63,10 +63,10 @@ SplitResult ShuffleSplitter::stop() if (item) { split_result.total_compress_time += item->getCompressTime(); - split_result.total_disk_time += item->getWriteTime(); + split_result.total_io_time += item->getWriteTime(); } } - split_result.total_serialize_time = split_result.total_spill_time - split_result.total_compress_time - split_result.total_disk_time; + split_result.total_serialize_time = split_result.total_spill_time - split_result.total_compress_time - split_result.total_io_time; partition_outputs.clear(); partition_cached_write_buffers.clear(); partition_write_buffers.clear(); @@ -143,13 +143,13 @@ void ShuffleSplitter::init() partition_outputs.resize(options.partition_num); partition_write_buffers.resize(options.partition_num); partition_cached_write_buffers.resize(options.partition_num); - split_result.partition_length.resize(options.partition_num); - split_result.raw_partition_length.resize(options.partition_num); + split_result.partition_lengths.resize(options.partition_num); + split_result.raw_partition_lengths.resize(options.partition_num); for (size_t partition_i = 0; partition_i < options.partition_num; ++partition_i) { partition_buffer[partition_i] = std::make_shared(options.split_size); - split_result.partition_length[partition_i] = 0; - split_result.raw_partition_length[partition_i] = 0; + split_result.partition_lengths[partition_i] = 0; + split_result.raw_partition_lengths[partition_i] = 0; } } @@ -186,13 +186,13 @@ void ShuffleSplitter::mergePartitionFiles() { auto bytes = reader.readBig(buffer.data(), buffer_size); data_write_buffer.write(buffer.data(), bytes); - split_result.partition_length[i] += bytes; + split_result.partition_lengths[i] += bytes; split_result.total_bytes_written += bytes; } reader.close(); std::filesystem::remove(file); } - split_result.total_disk_time += merge_io_time.elapsedNanoseconds(); + split_result.total_io_time += merge_io_time.elapsedNanoseconds(); data_write_buffer.close(); } @@ -253,7 +253,7 @@ void ShuffleSplitter::writeIndexFile() { auto index_file = options.data_file + ".index"; auto writer = std::make_unique(index_file, options.io_buffer_size, O_CREAT | O_WRONLY | O_TRUNC); - for (auto len : split_result.partition_length) + for (auto len : split_result.partition_lengths) { DB::writeIntText(len, *writer); DB::writeChar('\n', *writer); @@ -262,9 +262,10 @@ void ShuffleSplitter::writeIndexFile() void ColumnsBuffer::add(DB::Block & block, int start, int end) { - if (header.columns() == 0) + if (!header) header = block.cloneEmpty(); - if (accumulated_columns.empty()) [[unlikely]] + + if (accumulated_columns.empty()) { accumulated_columns.reserve(block.columns()); for (size_t i = 0; i < block.columns(); i++) @@ -274,6 +275,7 @@ void ColumnsBuffer::add(DB::Block & block, int start, int end) accumulated_columns.emplace_back(std::move(column)); } } + assert(!accumulated_columns.empty()); for (size_t i = 0; i < block.columns(); ++i) { @@ -291,9 +293,10 @@ void ColumnsBuffer::add(DB::Block & block, int start, int end) void ColumnsBuffer::appendSelective( size_t column_idx, const DB::Block & source, const DB::IColumn::Selector & selector, size_t from, size_t length) { - if (header.columns() == 0) + if (!header) header = source.cloneEmpty(); - if (accumulated_columns.empty()) [[unlikely]] + + if (accumulated_columns.empty()) { accumulated_columns.reserve(source.columns()); for (size_t i = 0; i < source.columns(); i++) @@ -303,6 +306,7 @@ void ColumnsBuffer::appendSelective( accumulated_columns.emplace_back(std::move(column)); } } + if (!accumulated_columns[column_idx]->onlyNull()) { accumulated_columns[column_idx]->insertRangeSelective( @@ -326,16 +330,13 @@ bool ColumnsBuffer::empty() const DB::Block ColumnsBuffer::releaseColumns() { - DB::Columns res(std::make_move_iterator(accumulated_columns.begin()), std::make_move_iterator(accumulated_columns.end())); + DB::Columns columns(std::make_move_iterator(accumulated_columns.begin()), std::make_move_iterator(accumulated_columns.end())); accumulated_columns.clear(); - if (res.empty()) - { + + if (columns.empty()) return header.cloneEmpty(); - } else - { - return header.cloneWithColumns(res); - } + return header.cloneWithColumns(columns); } DB::Block ColumnsBuffer::getHeader() diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index 600b87d8c99b..8f796914d1ce 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -48,6 +48,8 @@ struct SplitOptions size_t spill_threshold = 300 * 1024 * 1024; std::string hash_algorithm; bool throw_if_memory_exceed = true; + /// Whether to flush partition_block_buffer in PartitionWriter before evict. + bool flush_block_buffer_before_evict = false; }; class ColumnsBuffer @@ -59,11 +61,11 @@ class ColumnsBuffer void add(DB::Block & columns, int start, int end); void appendSelective(size_t column_idx, const DB::Block & source, const DB::IColumn::Selector & selector, size_t from, size_t length); + DB::Block getHeader(); size_t size() const; bool empty() const; DB::Block releaseColumns(); - DB::Block getHeader(); size_t bytes() const { @@ -73,14 +75,6 @@ class ColumnsBuffer return res; } - size_t allocatedBytes() const - { - size_t res = 0; - for (const auto & col : accumulated_columns) - res += col->allocatedBytes(); - return res; - } - private: DB::MutableColumns accumulated_columns; DB::Block header; @@ -90,17 +84,33 @@ using ColumnsBufferPtr = std::shared_ptr; struct SplitResult { - Int64 total_compute_pid_time = 0; - Int64 total_write_time = 0; - Int64 total_spill_time = 0; - Int64 total_compress_time = 0; - Int64 total_bytes_written = 0; - Int64 total_bytes_spilled = 0; - std::vector partition_length; - std::vector raw_partition_length; - Int64 total_split_time = 0; - Int64 total_disk_time = 0; - Int64 total_serialize_time = 0; + UInt64 total_compute_pid_time = 0; // Total nanoseconds to compute partition id + UInt64 total_write_time = 0; // Total nanoseconds to write data to local/celeborn, including the time writing to buffer + UInt64 total_spill_time = 0; // Total nanoseconds to execute PartitionWriter::evictPartitions + UInt64 total_compress_time = 0; // Total nanoseconds to execute compression before writing data to local/celeborn + UInt64 total_bytes_written = 0; // Sum of partition_length + UInt64 total_bytes_spilled = 0; // Total bytes of blocks spilled to local/celeborn before serialization and compression + std::vector partition_lengths; // Total written bytes of each partition after serialization and compression + std::vector raw_partition_lengths; // Total written bytes of each partition after serialization + UInt64 total_split_time = 0; // Total nanoseconds to execute CachedShuffleWriter::split, excluding total_compute_pid_time + UInt64 total_io_time = 0; // Total nanoseconds to write data to local/celeborn, excluding the time writing to buffer + UInt64 total_serialize_time = 0; // Total nanoseconds to execute spill_to_file/spill_to_celeborn. Bad naming, it works not as the name suggests. + + String toString() const + { + std::ostringstream oss; + + auto to_seconds = [](UInt64 nanoseconds) -> double { + return static_cast(nanoseconds) / 1000000000ULL; + }; + + oss << "compute_pid_time(s):" << to_seconds(total_compute_pid_time) << " split_time(s):" << to_seconds(total_split_time) + << " spill time(s):" << to_seconds(total_spill_time) << " serialize_time(s):" << to_seconds(total_serialize_time) + << " compress_time(s):" << to_seconds(total_compress_time) << " write_time(s):" << to_seconds(total_write_time) + << " bytes_writen:" << total_bytes_written << " bytes_spilled:" << total_bytes_spilled + << " partition_num: " << partition_lengths.size() << std::endl; + return oss.str(); + } }; class ShuffleSplitter; @@ -122,7 +132,7 @@ class ShuffleSplitter : public ShuffleWriterBase void split(DB::Block & block) override; virtual void computeAndCountPartitionId(DB::Block &) { } - std::vector getPartitionLength() const { return split_result.partition_length; } + std::vector getPartitionLength() const { return split_result.partition_lengths; } void writeIndexFile(); SplitResult stop() override; diff --git a/cpp-ch/local-engine/Storages/IO/CompressedWriteBuffer.cpp b/cpp-ch/local-engine/Storages/IO/CompressedWriteBuffer.cpp index 4dbc1c03c49f..f58c62321183 100644 --- a/cpp-ch/local-engine/Storages/IO/CompressedWriteBuffer.cpp +++ b/cpp-ch/local-engine/Storages/IO/CompressedWriteBuffer.cpp @@ -53,7 +53,6 @@ void CompressedWriteBuffer::nextImpl() if (out.available() >= compressed_reserve_size + sizeof(CityHash_v1_0_2::uint128)) { char * out_compressed_ptr = out.position() + sizeof(CityHash_v1_0_2::uint128); - compress_time_watch.start(); UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, out_compressed_ptr); compress_time += compress_time_watch.elapsedNanoseconds(); CityHash_v1_0_2::uint128 checksum_(0,0); @@ -71,7 +70,6 @@ void CompressedWriteBuffer::nextImpl() else { compressed_buffer.resize(compressed_reserve_size); - compress_time_watch.start(); UInt32 compressed_size = codec->compress(working_buffer.begin(), decompressed_size, compressed_buffer.data()); compress_time += compress_time_watch.elapsedNanoseconds(); CityHash_v1_0_2::uint128 checksum_(0,0); @@ -83,7 +81,6 @@ void CompressedWriteBuffer::nextImpl() writeBinaryLittleEndian(checksum_.high64, out); Stopwatch write_time_watch; - write_time_watch.start(); out.write(compressed_buffer.data(), compressed_size); write_time += write_time_watch.elapsedNanoseconds(); } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 6659fe3c6b8b..bb1b4543e8c1 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -655,7 +655,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jboolean prefer_spill, jlong spill_threshold, jstring hash_algorithm, - jboolean throw_if_memory_exceed) + jboolean throw_if_memory_exceed, + jboolean flush_block_buffer_before_evict) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; @@ -698,7 +699,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .compress_method = jstring2string(env, codec), .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), - .throw_if_memory_exceed = static_cast(throw_if_memory_exceed)}; + .throw_if_memory_exceed = static_cast(throw_if_memory_exceed), + .flush_block_buffer_before_evict = static_cast(flush_block_buffer_before_evict)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; if (prefer_spill) @@ -727,7 +729,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jlong spill_threshold, jstring hash_algorithm, jobject pusher, - jboolean throw_if_memory_exceed) + jboolean throw_if_memory_exceed, + jboolean flush_block_buffer_before_evict) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; @@ -761,7 +764,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .compress_method = jstring2string(env, codec), .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), - .throw_if_memory_exceed = static_cast(throw_if_memory_exceed)}; + .throw_if_memory_exceed = static_cast(throw_if_memory_exceed), + .flush_block_buffer_before_evict = static_cast(flush_block_buffer_before_evict)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; splitter = new local_engine::SplitterHolder{.splitter = std::make_unique(name, options, pusher)}; @@ -790,14 +794,16 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_evi JNIEXPORT jobject Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_stop(JNIEnv * env, jobject, jlong splitterId) { LOCAL_ENGINE_JNI_METHOD_START + local_engine::SplitterHolder * splitter = reinterpret_cast(splitterId); auto result = splitter->splitter->stop(); - const auto & partition_lengths = result.partition_length; + + const auto & partition_lengths = result.partition_lengths; auto * partition_length_arr = env->NewLongArray(partition_lengths.size()); const auto * src = reinterpret_cast(partition_lengths.data()); env->SetLongArrayRegion(partition_length_arr, 0, partition_lengths.size(), src); - const auto & raw_partition_lengths = result.raw_partition_length; + const auto & raw_partition_lengths = result.raw_partition_lengths; auto * raw_partition_length_arr = env->NewLongArray(raw_partition_lengths.size()); const auto * raw_src = reinterpret_cast(raw_partition_lengths.data()); env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); @@ -814,7 +820,7 @@ JNIEXPORT jobject Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_s partition_length_arr, raw_partition_length_arr, result.total_split_time, - result.total_disk_time, + result.total_io_time, result.total_serialize_time); return split_result; diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala index 4580d04ba716..795c50ce47df 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala @@ -73,7 +73,8 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V]( GlutenConfig.getConf.chColumnarShuffleSpillThreshold, CHBackendSettings.shuffleHashAlgorithm, celebornPartitionPusher, - GlutenConfig.getConf.chColumnarThrowIfMemoryExceed + GlutenConfig.getConf.chColumnarThrowIfMemoryExceed, + GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict ) CHNativeMemoryAllocators.createSpillable( "CelebornShuffleWriter", diff --git a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala index 92dc9a291c98..c88208089738 100644 --- a/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala +++ b/shims/common/src/main/scala/io/glutenproject/GlutenConfig.scala @@ -243,6 +243,9 @@ class GlutenConfig(conf: SQLConf) extends Logging { def chColumnarThrowIfMemoryExceed: Boolean = conf.getConf(COLUMNAR_CH_THROW_IF_MEMORY_EXCEED) + def chColumnarFlushBlockBufferBeforeEvict: Boolean = + conf.getConf(COLUMNAR_CH_FLUSH_BLOCK_BUFFER_BEFORE_EVICT) + def transformPlanLogLevel: String = conf.getConf(TRANSFORM_PLAN_LOG_LEVEL) def substraitPlanLogLevel: String = conf.getConf(SUBSTRAIT_PLAN_LOG_LEVEL) @@ -1131,6 +1134,13 @@ object GlutenConfig { .booleanConf .createWithDefault(true) + val COLUMNAR_CH_FLUSH_BLOCK_BUFFER_BEFORE_EVICT = + buildConf("spark.gluten.sql.columnar.backend.ch.flushBlockBufferBeforeEvict") + .internal() + .doc("Whether to flush partition_block_buffer before execute evict in CH PartitionWriter.") + .booleanConf + .createWithDefault(false) + val TRANSFORM_PLAN_LOG_LEVEL = buildConf("spark.gluten.sql.transform.logLevel") .internal()