Skip to content

Commit

Permalink
optimize join lazy gen right column
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Oct 12, 2023
1 parent 1d40793 commit 946745e
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 141 deletions.
250 changes: 199 additions & 51 deletions src/Interpreters/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1055,9 +1055,11 @@ class AddedColumns

size_t size() const { return columns.size(); }

size_t lazy_row_size() const { return lazy_right_columns.size(); }

ColumnWithTypeAndName moveColumn(size_t i)
{
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
return ColumnWithTypeAndName(columns[i]->cloneEmpty(), type_name[i].type, type_name[i].qualified_name);
}

static void assertBlockEqualsStructureUpToLowCard(const Block & lhs_block, const Block & rhs_block)
Expand Down Expand Up @@ -1088,63 +1090,75 @@ class AddedColumns
}
}

template <bool has_defaults>
// template <bool has_defaults>
void appendFromBlock(const Block & block, size_t row_num)
{
if constexpr (has_defaults)
applyLazyDefaults();
// if constexpr (has_defaults)
// applyLazyDefaults();

#ifndef NDEBUG
/// Like assertBlocksHaveEqualStructure but doesn't check low cardinality
assertBlockEqualsStructureUpToLowCard(sample_block, block);
#else
UNUSED(assertBlockEqualsStructureUpToLowCard);
#endif
lazy_right_columns.emplace_back(RowRef(&block, row_num));
}

if (is_join_get)
{
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
if (auto * nullable_col = typeid_cast<ColumnNullable *>(columns[j].get());
nullable_col && !column_from_block.column->isNullable())
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
else if (auto * lowcard_col = typeid_cast<ColumnLowCardinality *>(columns[j].get());
lowcard_col && !typeid_cast<const ColumnLowCardinality *>(column_from_block.column.get()))
lowcard_col->insertFromFullColumn(*column_from_block.column, row_num);
else
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
else
ColumnsWithTypeAndName getColumns(size_t offset, size_t length)
{
ColumnsWithTypeAndName res;
for (size_t i = 0; i < this->size(); ++i)
{
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
MutableColumnPtr col = type_name[i].type->createColumn();
col->reserve(length);
size_t end = offset + length;
chassert(end <= lazy_right_columns.size());
for (size_t j = offset; j < end; ++j)
{
const auto & column_from_block = block.getByPosition(right_indexes[j]);
if (auto * lowcard_col = typeid_cast<ColumnLowCardinality *>(columns[j].get());
auto row = lazy_right_columns[j];
if (!row.block)
{
type_name[i].type->insertDefaultInto(*col);
continue;
}
const auto & column_from_block = row.block->getByPosition(right_indexes[i]);
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
if (is_join_get)
{
if (auto * nullable_col = typeid_cast<ColumnNullable *>(col.get());
nullable_col && !column_from_block.column->isNullable())
{
nullable_col->insertFromNotNullable(*column_from_block.column, row.row_num);
continue;
}
}
if (auto * lowcard_col = typeid_cast<ColumnLowCardinality *>(col.get());
lowcard_col && !typeid_cast<const ColumnLowCardinality *>(column_from_block.column.get()))
lowcard_col->insertFromFullColumn(*column_from_block.column, row_num);
lowcard_col->insertFromFullColumn(*column_from_block.column, row.row_num);
else
columns[j]->insertFrom(*column_from_block.column, row_num);
col->insertFrom(*column_from_block.column, row.row_num);
}
res.emplace_back(ColumnWithTypeAndName(std::move(col), type_name[i].type, type_name[i].qualified_name));
}
return res;
}

void appendDefaultRow()
{
++lazy_defaults_count;
lazy_right_columns.emplace_back(RowRef(nullptr, 0));
}

void applyLazyDefaults()
{
if (lazy_defaults_count)
{
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count);
lazy_defaults_count = 0;
}
}
// void applyLazyDefaults()
// {
// if (lazy_defaults_count)
// {
// for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
// JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count);
// lazy_defaults_count = 0;
// }
// }

const IColumn & leftAsofKey() const { return *left_asof_key; }

Expand All @@ -1162,6 +1176,7 @@ class AddedColumns
/// for ASOF
const IColumn * left_asof_key = nullptr;
Block sample_block;
std::vector<RowRef> lazy_right_columns;

bool is_join_get;

Expand All @@ -1173,6 +1188,135 @@ class AddedColumns
}
};


class StreamReplicateBlocks final: public IBlocksStream
{
public:
explicit StreamReplicateBlocks(const Block & block);
StreamReplicateBlocks(const Block & block, std::vector<size_t> right_col_idx, std::shared_ptr<AddedColumns> added_columns);
StreamReplicateBlocks(const Block & block, std::vector<size_t> right_col_idx, std::shared_ptr<AddedColumns> added_columns, std::unique_ptr<IColumn::Offsets> offsets_to_replicate, const std::vector<size_t> & need_replicate_pos, size_t max_block_size);

protected:
Block nextImpl() override;

private:
void materializeAddedColumns(size_t offset, size_t length)
{
if (added_columns)
{
if (length == 0)
length = added_columns->lazy_row_size();
auto columns = added_columns->getColumns(offset, length);
size_t i = 0;
for (const auto & idx : right_col_idx)
{
block.safeGetByPosition(idx).column = std::move(columns[i].column);
i++;
}
}
}

Block block;
std::vector<size_t> right_col_idx;
std::shared_ptr<AddedColumns> added_columns;
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
std::vector<size_t> need_replicate_pos;
size_t offset_index = 0;
size_t offset_remain_rows = 0;
size_t output_rows = 0;
size_t max_block_size = DEFAULT_BLOCK_SIZE;
};


StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_, std::vector<size_t> right_col_idx_, std::shared_ptr<AddedColumns> added_columns_, std::unique_ptr<IColumn::Offsets> offsets_to_replicate_, const std::vector<size_t> & need_replicate_pos_, size_t max_block_size_)
: block(block_), right_col_idx(right_col_idx_), added_columns(added_columns_), offsets_to_replicate(std::move(offsets_to_replicate_)), need_replicate_pos(need_replicate_pos_), max_block_size(max_block_size_)
{
}



Block StreamReplicateBlocks::nextImpl()
{
if (need_replicate_pos.empty() || !offsets_to_replicate)
{
finished.store(true);
materializeAddedColumns(0, 0);
return block;
}
if (offsets_to_replicate->back() < DEFAULT_BLOCK_SIZE)
{
finished.store(true);
for (const auto & i : need_replicate_pos)
{
block.safeGetByPosition(i).column =
block.safeGetByPosition(i).column->replicate(*offsets_to_replicate);
}
materializeAddedColumns(0, offsets_to_replicate->back());
return block;
}
IColumn::Offsets tmp_offsets_to_replicate;
size_t replicate_rows = 0;
size_t start_offset = offset_index;
// Split offsets_to_replicate according to max_block_size
while (offset_index < offsets_to_replicate->size() && replicate_rows < max_block_size)
{
auto current_offset_row = offset_index == 0 ? (*offsets_to_replicate)[offset_index] : (*offsets_to_replicate)[offset_index] - (*offsets_to_replicate)[offset_index - 1];
auto row = offset_remain_rows == 0 ? current_offset_row : offset_remain_rows;
if (row + replicate_rows > max_block_size)
{
size_t remain_rows = max_block_size - replicate_rows;
replicate_rows += remain_rows;
tmp_offsets_to_replicate.emplace_back(replicate_rows);
offset_remain_rows = row - remain_rows;
}
else
{
offset_remain_rows = 0;
replicate_rows += row;
tmp_offsets_to_replicate.emplace_back(replicate_rows);
offset_index ++;
}
}
Block res = block.cloneEmpty();
std::set<size_t> visited_pos;
// block that generates output
for (const auto & i : need_replicate_pos)
{
visited_pos.emplace(i);
res.safeGetByPosition(i).column =
block.safeGetByPosition(i).column->cut(start_offset,tmp_offsets_to_replicate.size())->replicate(tmp_offsets_to_replicate);
}
// Additional splitting is required for columns that do not need to be replicated.
if (!added_columns)
{
for (size_t i = 0; i < block.columns(); ++i)
{
if (!visited_pos.contains(i))
res.safeGetByPosition(i).column =
block.safeGetByPosition(i).column->cut(output_rows,replicate_rows);
}
}
else
{
auto columns = added_columns->getColumns(output_rows, replicate_rows);
size_t i = 0;
for (const auto & idx : right_col_idx)
{
res.safeGetByPosition(idx).column = std::move(columns[i].column);
i++;
}
}
output_rows += replicate_rows;
if (offset_index >= offsets_to_replicate->size()) finished.store(true);
return res;
}
StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_, std::vector<size_t> right_col_idx_, std::shared_ptr<AddedColumns> added_columns_) : block(block_), right_col_idx(right_col_idx_), added_columns(added_columns_)
{
}
StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_) : block(block_)
{
}

template <JoinKind KIND, JoinStrictness STRICTNESS>
struct JoinFeatures
{
Expand Down Expand Up @@ -1278,8 +1422,8 @@ void addFoundRowAll(
KnownRowsHolder<multiple_disjuncts> & known_rows [[maybe_unused]],
JoinStuff::JoinUsedFlags * used_flags [[maybe_unused]])
{
if constexpr (add_missing)
added.applyLazyDefaults();
// if constexpr (add_missing)
// added.applyLazyDefaults();

if constexpr (multiple_disjuncts)
{
Expand All @@ -1289,7 +1433,7 @@ void addFoundRowAll(
{
if (!known_rows.isKnown(std::make_pair(it->block, it->row_num)))
{
added.appendFromBlock<false>(*it->block, it->row_num);
added.appendFromBlock(*it->block, it->row_num);
++current_offset;
if (!new_known_rows_ptr)
{
Expand All @@ -1313,7 +1457,7 @@ void addFoundRowAll(
{
for (auto it = mapped.begin(); it.ok(); ++it)
{
added.appendFromBlock<false>(*it->block, it->row_num);
added.appendFromBlock(*it->block, it->row_num);
++current_offset;
}
}
Expand Down Expand Up @@ -1392,7 +1536,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
else
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);

added_columns.appendFromBlock<join_features.add_missing>(*row_ref.block, row_ref.row_num);
added_columns.appendFromBlock(*row_ref.block, row_ref.row_num);
}
else
addNotFoundRow<join_features.add_missing, join_features.need_replication>(added_columns, current_offset);
Expand Down Expand Up @@ -1423,7 +1567,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
if (used_once)
{
setUsed<need_filter>(filter, i);
added_columns.appendFromBlock<join_features.add_missing>(*mapped.block, mapped.row_num);
added_columns.appendFromBlock(*mapped.block, mapped.row_num);
}

break;
Expand All @@ -1441,7 +1585,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
{
setUsed<need_filter>(filter, i);
used_flags.template setUsed<join_features.need_flags, multiple_disjuncts>(find_result);
added_columns.appendFromBlock<join_features.add_missing>(*mapped.block, mapped.row_num);
added_columns.appendFromBlock(*mapped.block, mapped.row_num);

if (join_features.is_any_or_semi_join)
{
Expand All @@ -1464,7 +1608,7 @@ NO_INLINE IColumn::Filter joinRightColumns(
}
}

added_columns.applyLazyDefaults();
// added_columns.applyLazyDefaults();
return filter;
}

Expand Down Expand Up @@ -1581,7 +1725,7 @@ IBlocksStreamPtr HashJoin::joinBlockImpl(
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column
*/
AddedColumns added_columns(
std::shared_ptr<AddedColumns> added_columns = std::make_shared<AddedColumns>(
block_with_columns_to_add,
block,
savedBlockSample(),
Expand All @@ -1591,12 +1735,16 @@ IBlocksStreamPtr HashJoin::joinBlockImpl(
is_join_get);

bool has_required_right_keys = (required_right_keys.columns() != 0);
added_columns.need_filter = join_features.need_filter || has_required_right_keys;
added_columns->need_filter = join_features.need_filter || has_required_right_keys;

IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, used_flags);
IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(maps_, *added_columns, data->type, used_flags);

for (size_t i = 0; i < added_columns.size(); ++i)
block.insert(added_columns.moveColumn(i));
std::vector<size_t> right_col_idx;
for (size_t i = 0; i < added_columns->size(); ++i)
{
block.insert(added_columns->moveColumn(i));
right_col_idx.emplace_back(existing_columns + i);
}

std::vector<size_t> right_keys_to_replicate [[maybe_unused]];

Expand Down Expand Up @@ -1670,16 +1818,16 @@ IBlocksStreamPtr HashJoin::joinBlockImpl(

if constexpr (join_features.need_replication)
{
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns.offsets_to_replicate;
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate = added_columns->offsets_to_replicate;
std::vector<size_t> need_replicate_columns;
for (size_t i = 0; i < existing_columns; ++i)
need_replicate_columns.emplace_back(i);
need_replicate_columns.insert(need_replicate_columns.end(), right_keys_to_replicate.begin(), right_keys_to_replicate.end());
return std::make_shared<StreamReplicateBlocks>(block, std::move(offsets_to_replicate), need_replicate_columns, table_join->maxJoinedBlockRows());
return std::make_shared<StreamReplicateBlocks>(block, right_col_idx, added_columns, std::move(offsets_to_replicate), need_replicate_columns, table_join->maxJoinedBlockRows());
}
else
{
return std::make_shared<StreamReplicateBlocks>(block);
return std::make_shared<StreamReplicateBlocks>(block, right_col_idx, added_columns);
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/Interpreters/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,6 @@ class HashJoin : public IJoin
bool shrink_blocks = false;
Int64 memory_usage_before_adding_blocks = 0;

std::shared_ptr<StreamReplicateBlocks> current_result;

Poco::Logger * log;

/// Should be set via setLock to protect hash table from modification from StorageJoin
Expand Down
Loading

0 comments on commit 946745e

Please sign in to comment.