diff --git a/src/Interpreters/GraceHashJoin.cpp b/src/Interpreters/GraceHashJoin.cpp index 2a22833f29de..9d12525e408c 100644 --- a/src/Interpreters/GraceHashJoin.cpp +++ b/src/Interpreters/GraceHashJoin.cpp @@ -528,51 +528,68 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream Block nextImpl() override { - if (!tmp_result || tmp_result->isFinished()) { - Block block; - size_t num_buckets = buckets.size(); - size_t current_idx = buckets[current_bucket]->idx; - - do + std::lock_guard lock(remaining_blocks_mutex); + if (!remaining_blocks.empty()) { - // One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform. - // There is a lock inside left_reader.read() . - block = left_reader.read(); - if (!block) - return {}; - - // block comes from left_reader, need to join with right table to get the result. - Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets); - block = std::move(blocks[current_idx]); - - /* - * We need to filter out blocks that were written to the current bucket `B_{n}` - * but then virtually moved to another bucket `B_{n+i}` on rehash. - * Bucket `B_{n+i}` is waiting for the buckets with smaller index to be processed, - * and rows can be moved only forward (because we increase hash modulo twice on each rehash), - * so it is safe to add blocks. - */ - for (size_t bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) - { - if (blocks[bucket_idx].rows() == 0) - continue; - - if (bucket_idx == current_idx) // Rows that are still in our bucket - continue; - - buckets[bucket_idx]->addLeftBlock(blocks[bucket_idx]); - } - } while (block.rows() == 0); - - ExtraBlockPtr not_processed; - - tmp_result = hash_join->joinBlockWithStreamOutput(block, not_processed); - - if (not_processed) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type"); + auto res = remaining_blocks.front(); + remaining_blocks.pop(); + return res; + } + } + Block block; + size_t num_buckets = buckets.size(); + size_t current_idx = buckets[current_bucket]->idx; + + do + { + // One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform. + // There is a lock inside left_reader.read() . + block = left_reader.read(); + if (!block) + return {}; + + // block comes from left_reader, need to join with right table to get the result. + Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets); + block = std::move(blocks[current_idx]); + + /* + * We need to filter out blocks that were written to the current bucket `B_{n}` + * but then virtually moved to another bucket `B_{n+i}` on rehash. + * Bucket `B_{n+i}` is waiting for the buckets with smaller index to be processed, + * and rows can be moved only forward (because we increase hash modulo twice on each rehash), + * so it is safe to add blocks. + */ + for (size_t bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) + { + if (blocks[bucket_idx].rows() == 0) + continue; + + if (bucket_idx == current_idx) // Rows that are still in our bucket + continue; + + buckets[bucket_idx]->addLeftBlock(blocks[bucket_idx]); + } + } while (block.rows() == 0); + + ExtraBlockPtr not_processed; + auto stream_output = hash_join->joinBlockWithStreamOutput(block, not_processed); + + if (not_processed) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type"); + std::vector blocks; + while (!stream_output->isFinished()) + { + blocks.push_back(stream_output->next()); + } + std::lock_guard lock(remaining_blocks_mutex); + for (const auto & item : blocks) + { + remaining_blocks.push(item); } - return tmp_result->next(); + auto res = remaining_blocks.front(); + remaining_blocks.pop(); + return res; } size_t current_bucket; @@ -580,7 +597,8 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream InMemoryJoinPtr hash_join; AccumulatedBlockReader left_reader; - IBlocksStreamPtr tmp_result; + std::queue remaining_blocks; + std::mutex remaining_blocks_mutex; Names left_key_names; Names right_key_names;