Skip to content

Commit

Permalink
fix grace hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Sep 21, 2023
1 parent 7729447 commit c5ff4aa
Showing 1 changed file with 61 additions and 43 deletions.
104 changes: 61 additions & 43 deletions src/Interpreters/GraceHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,59 +528,77 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream

Block nextImpl() override
{
if (!tmp_result || tmp_result->isFinished())
{
Block block;
size_t num_buckets = buckets.size();
size_t current_idx = buckets[current_bucket]->idx;

do
std::lock_guard lock(remaining_blocks_mutex);
if (!remaining_blocks.empty())
{
// One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform.
// There is a lock inside left_reader.read() .
block = left_reader.read();
if (!block)
return {};

// block comes from left_reader, need to join with right table to get the result.
Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets);
block = std::move(blocks[current_idx]);

/*
* We need to filter out blocks that were written to the current bucket `B_{n}`
* but then virtually moved to another bucket `B_{n+i}` on rehash.
* Bucket `B_{n+i}` is waiting for the buckets with smaller index to be processed,
* and rows can be moved only forward (because we increase hash modulo twice on each rehash),
* so it is safe to add blocks.
*/
for (size_t bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx)
{
if (blocks[bucket_idx].rows() == 0)
continue;

if (bucket_idx == current_idx) // Rows that are still in our bucket
continue;

buckets[bucket_idx]->addLeftBlock(blocks[bucket_idx]);
}
} while (block.rows() == 0);

ExtraBlockPtr not_processed;

tmp_result = hash_join->joinBlockWithStreamOutput(block, not_processed);

if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type");
auto res = remaining_blocks.front();
remaining_blocks.pop();
return res;
}
}
Block block;
size_t num_buckets = buckets.size();
size_t current_idx = buckets[current_bucket]->idx;

do
{
// One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform.
// There is a lock inside left_reader.read() .
block = left_reader.read();
if (!block)
return {};

// block comes from left_reader, need to join with right table to get the result.
Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets);
block = std::move(blocks[current_idx]);

/*
* We need to filter out blocks that were written to the current bucket `B_{n}`
* but then virtually moved to another bucket `B_{n+i}` on rehash.
* Bucket `B_{n+i}` is waiting for the buckets with smaller index to be processed,
* and rows can be moved only forward (because we increase hash modulo twice on each rehash),
* so it is safe to add blocks.
*/
for (size_t bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx)
{
if (blocks[bucket_idx].rows() == 0)
continue;

if (bucket_idx == current_idx) // Rows that are still in our bucket
continue;

buckets[bucket_idx]->addLeftBlock(blocks[bucket_idx]);
}
} while (block.rows() == 0);

ExtraBlockPtr not_processed;
auto stream_output = hash_join->joinBlockWithStreamOutput(block, not_processed);

if (not_processed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unsupported hash join type");
std::vector<Block> blocks;
while (!stream_output->isFinished())
{
blocks.push_back(stream_output->next());
}
std::lock_guard lock(remaining_blocks_mutex);
for (const auto & item : blocks)
{
remaining_blocks.push(item);
}
return tmp_result->next();
auto res = remaining_blocks.front();
remaining_blocks.pop();
return res;
}

size_t current_bucket;
Buckets buckets;
InMemoryJoinPtr hash_join;

AccumulatedBlockReader left_reader;
IBlocksStreamPtr tmp_result;
std::queue<Block> remaining_blocks;
std::mutex remaining_blocks_mutex;

Names left_key_names;
Names right_key_names;
Expand Down

0 comments on commit c5ff4aa

Please sign in to comment.