Skip to content

Commit

Permalink
[GLUTEN-3949][CH] Merge small blocks from upstream phase into a large…
Browse files Browse the repository at this point in the history
… one #3952

What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)

Fixes: #3949

How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

unit tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
lgbo-ustc authored Dec 8, 2023
1 parent 2ccea31 commit 305924a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ QueryPlanStepPtr SerializedPlanParser::parseReadRealWithJavaIter(const substrait
auto iter_index = std::stoi(iter.substr(pos + 1, iter.size()));

auto source = std::make_shared<SourceFromJavaIter>(
TypeParser::buildBlockFromNamedStruct(rel.base_schema()), input_iters[iter_index], materialize_inputs[iter_index]);
context, TypeParser::buildBlockFromNamedStruct(rel.base_schema()), input_iters[iter_index], materialize_inputs[iter_index]);
QueryPlanStepPtr source_step = std::make_unique<ReadFromPreparedSource>(Pipe(source));
source_step->setStepDescription("Read From Java Iter");
return source_step;
Expand Down
40 changes: 38 additions & 2 deletions cpp-ch/local-engine/Shuffle/ShuffleReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Common/DebugUtils.h>
#include <Common/JNIUtils.h>
#include <Common/Stopwatch.h>
#include <Core/Block.h>

using namespace DB;

Expand All @@ -46,12 +47,47 @@ local_engine::ShuffleReader::ShuffleReader(std::unique_ptr<ReadBuffer> in_, bool
}
Block * local_engine::ShuffleReader::read()
{
auto block = input_stream->read();
setCurrentBlock(block);
// Avoid to generate out a lot of small blocks.
const size_t at_least_block_size = 64 * 1024;
size_t total_rows = 0;
std::vector<DB::Block> blocks;
if (pending_block)
{
blocks.emplace_back(std::move(pending_block));
total_rows += blocks.back().rows();
pending_block = {};
}

while(total_rows < at_least_block_size)
{
auto block = input_stream->read();
if (!block.rows())
{
break;
}
if (!blocks.empty()
&& (blocks[0].info.is_overflows != block.info.is_overflows || blocks[0].info.bucket_num != block.info.bucket_num))
{
pending_block = std::move(block);
break;
}
total_rows += block.rows();
blocks.emplace_back(std::move(block));
}

DB::Block final_block;
if (!blocks.empty())
{
auto block_info = blocks[0].info;
final_block = DB::concatenateBlocks(blocks);
final_block.info = block_info;
}
setCurrentBlock(final_block);
if (unlikely(header.columns() == 0))
header = currentBlock().cloneEmpty();
return &currentBlock();
}

ShuffleReader::~ShuffleReader()
{
in.reset();
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/Shuffle/ShuffleReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ShuffleReader : BlockIterator
std::unique_ptr<DB::ReadBuffer> compressed_in;
std::unique_ptr<local_engine::NativeReader> input_stream;
DB::Block header;
DB::Block pending_block;
};


Expand Down
79 changes: 56 additions & 23 deletions cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Common/Exception.h>
#include <Common/JNIUtils.h>


namespace local_engine
{
jclass SourceFromJavaIter::serialized_record_batch_iterator_class = nullptr;
Expand All @@ -38,39 +39,71 @@ static DB::Block getRealHeader(const DB::Block & header)
return header;
return BlockUtil::buildRowCountHeader();
}
SourceFromJavaIter::SourceFromJavaIter(DB::Block header, jobject java_iter_, bool materialize_input_)
: DB::ISource(getRealHeader(header)), java_iter(java_iter_), materialize_input(materialize_input_), original_header(header)
SourceFromJavaIter::SourceFromJavaIter(DB::ContextPtr context_, DB::Block header, jobject java_iter_, bool materialize_input_)
: DB::ISource(getRealHeader(header))
, java_iter(java_iter_)
, materialize_input(materialize_input_)
, original_header(header)
, context(context_)
{
}
DB::Chunk SourceFromJavaIter::generate()
{
GET_JNIENV(env)
jboolean has_next = safeCallBooleanMethod(env, java_iter, serialized_record_batch_iterator_hasNext);
size_t max_block_size = context->getSettingsRef().max_block_size;
DB::Chunk result;
if (has_next)
size_t total_rows = 0;
std::vector<DB::Block> blocks;
if (pending_block.rows())
{
total_rows += pending_block.rows();
blocks.emplace_back(std::move(pending_block));
pending_block = {};
}

while(total_rows < max_block_size)
{
jboolean has_next = safeCallBooleanMethod(env, java_iter, serialized_record_batch_iterator_hasNext);
if (!has_next)
break;
jbyteArray block_address = static_cast<jbyteArray>(safeCallObjectMethod(env, java_iter, serialized_record_batch_iterator_next));
DB::Block * block = reinterpret_cast<DB::Block *>(byteArrayToLong(env, block_address));

if (!blocks.empty() && (blocks[0].info.is_overflows != block->info.is_overflows || blocks[0].info.bucket_num != block->info.bucket_num))
{
pending_block = std::move(*block);
break;
}

if (block->rows())
{
total_rows += block->rows();
blocks.emplace_back(std::move(*block));
}
}

if (total_rows)
{
jbyteArray block = static_cast<jbyteArray>(safeCallObjectMethod(env, java_iter, serialized_record_batch_iterator_next));
DB::Block * data = reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
if(materialize_input)
materializeBlockInplace(*data);
if (data->rows() > 0)
if (original_header.columns())
{
auto is_overflows = blocks[0].info.is_overflows;
auto bucket_num = blocks[0].info.bucket_num;
auto merged_block = DB::concatenateBlocks(blocks);
if (materialize_input)
materializeBlockInplace(merged_block);
result.setColumns(merged_block.getColumns(), total_rows);
convertNullable(result);
auto info = std::make_shared<DB::AggregatedChunkInfo>();
info->is_overflows = is_overflows;
info->bucket_num = bucket_num;
result.setChunkInfo(info);
}
else
{
size_t rows = data->rows();
if (original_header.columns())
{
result.setColumns(data->mutateColumns(), rows);
convertNullable(result);
auto info = std::make_shared<DB::AggregatedChunkInfo>();
info->is_overflows = data->info.is_overflows;
info->bucket_num = data->info.bucket_num;
result.setChunkInfo(info);
}
else
{
result = BlockUtil::buildRowCountChunk(rows);
}
result = BlockUtil::buildRowCountChunk(total_rows);
}
}

CLEAN_JNIENV
return result;
}
Expand Down
6 changes: 5 additions & 1 deletion cpp-ch/local-engine/Storages/SourceFromJavaIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once
#include <jni.h>
#include <Processors/ISource.h>
#include <Interpreters/Context.h>

namespace local_engine
{
Expand All @@ -29,7 +30,7 @@ class SourceFromJavaIter : public DB::ISource

static Int64 byteArrayToLong(JNIEnv * env, jbyteArray arr);

SourceFromJavaIter(DB::Block header, jobject java_iter_, bool materialize_input_);
SourceFromJavaIter(DB::ContextPtr context_, DB::Block header, jobject java_iter_, bool materialize_input_);
~SourceFromJavaIter() override;

String getName() const override { return "SourceFromJavaIter"; }
Expand All @@ -40,7 +41,10 @@ class SourceFromJavaIter : public DB::ISource

jobject java_iter;
bool materialize_input;
DB::ContextPtr context;
DB::Block original_header;

DB::Block pending_block;
};

}

0 comments on commit 305924a

Please sign in to comment.