Skip to content

Commit

Permalink
fix concurrent hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Sep 21, 2023
1 parent c5ff4aa commit 2607efc
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions src/Interpreters/ConcurrentHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,22 @@ void ConcurrentHashJoin::joinBlock(Block & block, std::shared_ptr<ExtraBlock> &
{
Blocks dispatched_blocks = dispatchBlock(table_join->getOnlyClause().key_names_left, block);
block = {};
Blocks blocks;
for (size_t i = 0; i < dispatched_blocks.size(); ++i)
{
std::shared_ptr<ExtraBlock> none_extra_block;
auto & hash_join = hash_joins[i];
auto & dispatched_block = dispatched_blocks[i];
hash_join->data->joinBlock(dispatched_block, none_extra_block);
auto stream_output = hash_join->data->joinBlockWithStreamOutput(dispatched_block, none_extra_block);
while (!stream_output->isFinished())
{
blocks.emplace_back(stream_output->next());
}
if (none_extra_block && !none_extra_block->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "not_processed should be empty");
}

block = concatenateBlocks(dispatched_blocks);
block = concatenateBlocks(blocks);
}

void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const
Expand Down

0 comments on commit 2607efc

Please sign in to comment.