From 305924a19025f5ff0a65de0d38bac9e14971ecd9 Mon Sep 17 00:00:00 2001 From: lgbo Date: Fri, 8 Dec 2023 11:21:55 +0800 Subject: [PATCH] [GLUTEN-3949][CH] Merge small blocks from upstream phase into a large one #3952 What changes were proposed in this pull request? (Please fill in changes proposed in this fix) Fixes: #3949 How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) unit tests (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) --- .../Parser/SerializedPlanParser.cpp | 2 +- cpp-ch/local-engine/Shuffle/ShuffleReader.cpp | 40 +++++++++- cpp-ch/local-engine/Shuffle/ShuffleReader.h | 1 + .../Storages/SourceFromJavaIter.cpp | 79 +++++++++++++------ .../Storages/SourceFromJavaIter.h | 6 +- 5 files changed, 101 insertions(+), 27 deletions(-) diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 636ccc843691..f70132b4d378 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -272,7 +272,7 @@ QueryPlanStepPtr SerializedPlanParser::parseReadRealWithJavaIter(const substrait auto iter_index = std::stoi(iter.substr(pos + 1, iter.size())); auto source = std::make_shared( - TypeParser::buildBlockFromNamedStruct(rel.base_schema()), input_iters[iter_index], materialize_inputs[iter_index]); + context, TypeParser::buildBlockFromNamedStruct(rel.base_schema()), input_iters[iter_index], materialize_inputs[iter_index]); QueryPlanStepPtr source_step = std::make_unique(Pipe(source)); source_step->setStepDescription("Read From Java Iter"); return source_step; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp index 50165ca6661a..00b293f62d9b 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.cpp @@ -21,6 +21,7 @@ #include #include #include +#include using namespace DB; @@ -46,12 +47,47 @@ local_engine::ShuffleReader::ShuffleReader(std::unique_ptr in_, bool } Block * local_engine::ShuffleReader::read() { - auto block = input_stream->read(); - setCurrentBlock(block); + // Avoid to generate out a lot of small blocks. + const size_t at_least_block_size = 64 * 1024; + size_t total_rows = 0; + std::vector blocks; + if (pending_block) + { + blocks.emplace_back(std::move(pending_block)); + total_rows += blocks.back().rows(); + pending_block = {}; + } + + while(total_rows < at_least_block_size) + { + auto block = input_stream->read(); + if (!block.rows()) + { + break; + } + if (!blocks.empty() + && (blocks[0].info.is_overflows != block.info.is_overflows || blocks[0].info.bucket_num != block.info.bucket_num)) + { + pending_block = std::move(block); + break; + } + total_rows += block.rows(); + blocks.emplace_back(std::move(block)); + } + + DB::Block final_block; + if (!blocks.empty()) + { + auto block_info = blocks[0].info; + final_block = DB::concatenateBlocks(blocks); + final_block.info = block_info; + } + setCurrentBlock(final_block); if (unlikely(header.columns() == 0)) header = currentBlock().cloneEmpty(); return ¤tBlock(); } + ShuffleReader::~ShuffleReader() { in.reset(); diff --git a/cpp-ch/local-engine/Shuffle/ShuffleReader.h b/cpp-ch/local-engine/Shuffle/ShuffleReader.h index 082e75a26ca6..37b00f6ae6af 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleReader.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleReader.h @@ -45,6 +45,7 @@ class ShuffleReader : BlockIterator std::unique_ptr compressed_in; std::unique_ptr input_stream; DB::Block header; + DB::Block pending_block; }; diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp index 74d7d7054db7..4031f6ff5359 100644 --- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp +++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp @@ -25,6 +25,7 @@ #include #include + namespace local_engine { jclass SourceFromJavaIter::serialized_record_batch_iterator_class = nullptr; @@ -38,39 +39,71 @@ static DB::Block getRealHeader(const DB::Block & header) return header; return BlockUtil::buildRowCountHeader(); } -SourceFromJavaIter::SourceFromJavaIter(DB::Block header, jobject java_iter_, bool materialize_input_) - : DB::ISource(getRealHeader(header)), java_iter(java_iter_), materialize_input(materialize_input_), original_header(header) +SourceFromJavaIter::SourceFromJavaIter(DB::ContextPtr context_, DB::Block header, jobject java_iter_, bool materialize_input_) + : DB::ISource(getRealHeader(header)) + , java_iter(java_iter_) + , materialize_input(materialize_input_) + , original_header(header) + , context(context_) { } DB::Chunk SourceFromJavaIter::generate() { GET_JNIENV(env) - jboolean has_next = safeCallBooleanMethod(env, java_iter, serialized_record_batch_iterator_hasNext); + size_t max_block_size = context->getSettingsRef().max_block_size; DB::Chunk result; - if (has_next) + size_t total_rows = 0; + std::vector blocks; + if (pending_block.rows()) + { + total_rows += pending_block.rows(); + blocks.emplace_back(std::move(pending_block)); + pending_block = {}; + } + + while(total_rows < max_block_size) + { + jboolean has_next = safeCallBooleanMethod(env, java_iter, serialized_record_batch_iterator_hasNext); + if (!has_next) + break; + jbyteArray block_address = static_cast(safeCallObjectMethod(env, java_iter, serialized_record_batch_iterator_next)); + DB::Block * block = reinterpret_cast(byteArrayToLong(env, block_address)); + + if (!blocks.empty() && (blocks[0].info.is_overflows != block->info.is_overflows || blocks[0].info.bucket_num != block->info.bucket_num)) + { + pending_block = std::move(*block); + break; + } + + if (block->rows()) + { + total_rows += block->rows(); + blocks.emplace_back(std::move(*block)); + } + } + + if (total_rows) { - jbyteArray block = static_cast(safeCallObjectMethod(env, java_iter, serialized_record_batch_iterator_next)); - DB::Block * data = reinterpret_cast(byteArrayToLong(env, block)); - if(materialize_input) - materializeBlockInplace(*data); - if (data->rows() > 0) + if (original_header.columns()) + { + auto is_overflows = blocks[0].info.is_overflows; + auto bucket_num = blocks[0].info.bucket_num; + auto merged_block = DB::concatenateBlocks(blocks); + if (materialize_input) + materializeBlockInplace(merged_block); + result.setColumns(merged_block.getColumns(), total_rows); + convertNullable(result); + auto info = std::make_shared(); + info->is_overflows = is_overflows; + info->bucket_num = bucket_num; + result.setChunkInfo(info); + } + else { - size_t rows = data->rows(); - if (original_header.columns()) - { - result.setColumns(data->mutateColumns(), rows); - convertNullable(result); - auto info = std::make_shared(); - info->is_overflows = data->info.is_overflows; - info->bucket_num = data->info.bucket_num; - result.setChunkInfo(info); - } - else - { - result = BlockUtil::buildRowCountChunk(rows); - } + result = BlockUtil::buildRowCountChunk(total_rows); } } + CLEAN_JNIENV return result; } diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.h b/cpp-ch/local-engine/Storages/SourceFromJavaIter.h index a6582798103b..4ae51d45bc3a 100644 --- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.h +++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.h @@ -17,6 +17,7 @@ #pragma once #include #include +#include namespace local_engine { @@ -29,7 +30,7 @@ class SourceFromJavaIter : public DB::ISource static Int64 byteArrayToLong(JNIEnv * env, jbyteArray arr); - SourceFromJavaIter(DB::Block header, jobject java_iter_, bool materialize_input_); + SourceFromJavaIter(DB::ContextPtr context_, DB::Block header, jobject java_iter_, bool materialize_input_); ~SourceFromJavaIter() override; String getName() const override { return "SourceFromJavaIter"; } @@ -40,7 +41,10 @@ class SourceFromJavaIter : public DB::ISource jobject java_iter; bool materialize_input; + DB::ContextPtr context; DB::Block original_header; + + DB::Block pending_block; }; }