From 2607efccabaae7355af223276df9176ea1459411 Mon Sep 17 00:00:00 2001 From: liuneng <1398775315@qq.com> Date: Wed, 20 Sep 2023 15:04:51 +0800 Subject: [PATCH] fix concurrent hash join --- src/Interpreters/ConcurrentHashJoin.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/ConcurrentHashJoin.cpp b/src/Interpreters/ConcurrentHashJoin.cpp index 1a8e0ad96fa4..b6a524cc3eb7 100644 --- a/src/Interpreters/ConcurrentHashJoin.cpp +++ b/src/Interpreters/ConcurrentHashJoin.cpp @@ -97,17 +97,22 @@ void ConcurrentHashJoin::joinBlock(Block & block, std::shared_ptr & { Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_left, block); block = {}; + Blocks blocks; for (size_t i = 0; i < dispatched_blocks.size(); ++i) { std::shared_ptr none_extra_block; auto & hash_join = hash_joins[i]; auto & dispatched_block = dispatched_blocks[i]; - hash_join->data->joinBlock(dispatched_block, none_extra_block); + auto stream_output = hash_join->data->joinBlockWithStreamOutput(dispatched_block, none_extra_block); + while (!stream_output->isFinished()) + { + blocks.emplace_back(stream_output->next()); + } if (none_extra_block && !none_extra_block->empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "not_processed should be empty"); } - block = concatenateBlocks(dispatched_blocks); + block = concatenateBlocks(blocks); } void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const