diff --git a/src/Interpreters/HashJoin.cpp b/src/Interpreters/HashJoin.cpp index 3c7bbf499adf..8d93ee7e26aa 100644 --- a/src/Interpreters/HashJoin.cpp +++ b/src/Interpreters/HashJoin.cpp @@ -1055,9 +1055,11 @@ class AddedColumns size_t size() const { return columns.size(); } + size_t lazy_row_size() const { return lazy_right_columns.size(); } + ColumnWithTypeAndName moveColumn(size_t i) { - return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name); + return ColumnWithTypeAndName(columns[i]->cloneEmpty(), type_name[i].type, type_name[i].qualified_name); } static void assertBlockEqualsStructureUpToLowCard(const Block & lhs_block, const Block & rhs_block) @@ -1088,11 +1090,11 @@ class AddedColumns } } - template +// template void appendFromBlock(const Block & block, size_t row_num) { - if constexpr (has_defaults) - applyLazyDefaults(); +// if constexpr (has_defaults) +// applyLazyDefaults(); #ifndef NDEBUG /// Like assertBlocksHaveEqualStructure but doesn't check low cardinality @@ -1100,51 +1102,63 @@ class AddedColumns #else UNUSED(assertBlockEqualsStructureUpToLowCard); #endif + lazy_right_columns.emplace_back(RowRef(&block, row_num)); + } - if (is_join_get) - { - /// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin. - for (size_t j = 0, size = right_indexes.size(); j < size; ++j) - { - const auto & column_from_block = block.getByPosition(right_indexes[j]); - if (auto * nullable_col = typeid_cast(columns[j].get()); - nullable_col && !column_from_block.column->isNullable()) - nullable_col->insertFromNotNullable(*column_from_block.column, row_num); - else if (auto * lowcard_col = typeid_cast(columns[j].get()); - lowcard_col && !typeid_cast(column_from_block.column.get())) - lowcard_col->insertFromFullColumn(*column_from_block.column, row_num); - else - columns[j]->insertFrom(*column_from_block.column, row_num); - } - } - else + ColumnsWithTypeAndName getColumns(size_t offset, size_t length) + { + ColumnsWithTypeAndName res; + for (size_t i = 0; i < this->size(); ++i) { - for (size_t j = 0, size = right_indexes.size(); j < size; ++j) + MutableColumnPtr col = type_name[i].type->createColumn(); + col->reserve(length); + size_t end = offset + length; + chassert(end <= lazy_right_columns.size()); + for (size_t j = offset; j < end; ++j) { - const auto & column_from_block = block.getByPosition(right_indexes[j]); - if (auto * lowcard_col = typeid_cast(columns[j].get()); + auto row = lazy_right_columns[j]; + if (!row.block) + { + type_name[i].type->insertDefaultInto(*col); + continue; + } + const auto & column_from_block = row.block->getByPosition(right_indexes[i]); + /// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin. + if (is_join_get) + { + if (auto * nullable_col = typeid_cast(col.get()); + nullable_col && !column_from_block.column->isNullable()) + { + nullable_col->insertFromNotNullable(*column_from_block.column, row.row_num); + continue; + } + } + if (auto * lowcard_col = typeid_cast(col.get()); lowcard_col && !typeid_cast(column_from_block.column.get())) - lowcard_col->insertFromFullColumn(*column_from_block.column, row_num); + lowcard_col->insertFromFullColumn(*column_from_block.column, row.row_num); else - columns[j]->insertFrom(*column_from_block.column, row_num); + col->insertFrom(*column_from_block.column, row.row_num); } + res.emplace_back(ColumnWithTypeAndName(std::move(col), type_name[i].type, type_name[i].qualified_name)); } + return res; } void appendDefaultRow() { ++lazy_defaults_count; + lazy_right_columns.emplace_back(RowRef(nullptr, 0)); } - void applyLazyDefaults() - { - if (lazy_defaults_count) - { - for (size_t j = 0, size = right_indexes.size(); j < size; ++j) - JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count); - lazy_defaults_count = 0; - } - } +// void applyLazyDefaults() +// { +// if (lazy_defaults_count) +// { +// for (size_t j = 0, size = right_indexes.size(); j < size; ++j) +// JoinCommon::addDefaultValues(*columns[j], type_name[j].type, lazy_defaults_count); +// lazy_defaults_count = 0; +// } +// } const IColumn & leftAsofKey() const { return *left_asof_key; } @@ -1162,6 +1176,7 @@ class AddedColumns /// for ASOF const IColumn * left_asof_key = nullptr; Block sample_block; + std::vector lazy_right_columns; bool is_join_get; @@ -1173,6 +1188,135 @@ class AddedColumns } }; + +class StreamReplicateBlocks final: public IBlocksStream +{ +public: + explicit StreamReplicateBlocks(const Block & block); + StreamReplicateBlocks(const Block & block, std::vector right_col_idx, std::shared_ptr added_columns); + StreamReplicateBlocks(const Block & block, std::vector right_col_idx, std::shared_ptr added_columns, std::unique_ptr offsets_to_replicate, const std::vector & need_replicate_pos, size_t max_block_size); + +protected: + Block nextImpl() override; + +private: + void materializeAddedColumns(size_t offset, size_t length) + { + if (added_columns) + { + if (length == 0) + length = added_columns->lazy_row_size(); + auto columns = added_columns->getColumns(offset, length); + size_t i = 0; + for (const auto & idx : right_col_idx) + { + block.safeGetByPosition(idx).column = std::move(columns[i].column); + i++; + } + } + } + + Block block; + std::vector right_col_idx; + std::shared_ptr added_columns; + std::unique_ptr offsets_to_replicate; + std::vector need_replicate_pos; + size_t offset_index = 0; + size_t offset_remain_rows = 0; + size_t output_rows = 0; + size_t max_block_size = DEFAULT_BLOCK_SIZE; +}; + + +StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_, std::vector right_col_idx_, std::shared_ptr added_columns_, std::unique_ptr offsets_to_replicate_, const std::vector & need_replicate_pos_, size_t max_block_size_) + : block(block_), right_col_idx(right_col_idx_), added_columns(added_columns_), offsets_to_replicate(std::move(offsets_to_replicate_)), need_replicate_pos(need_replicate_pos_), max_block_size(max_block_size_) +{ +} + + + +Block StreamReplicateBlocks::nextImpl() +{ + if (need_replicate_pos.empty() || !offsets_to_replicate) + { + finished.store(true); + materializeAddedColumns(0, 0); + return block; + } + if (offsets_to_replicate->back() < DEFAULT_BLOCK_SIZE) + { + finished.store(true); + for (const auto & i : need_replicate_pos) + { + block.safeGetByPosition(i).column = + block.safeGetByPosition(i).column->replicate(*offsets_to_replicate); + } + materializeAddedColumns(0, offsets_to_replicate->back()); + return block; + } + IColumn::Offsets tmp_offsets_to_replicate; + size_t replicate_rows = 0; + size_t start_offset = offset_index; + // Split offsets_to_replicate according to max_block_size + while (offset_index < offsets_to_replicate->size() && replicate_rows < max_block_size) + { + auto current_offset_row = offset_index == 0 ? (*offsets_to_replicate)[offset_index] : (*offsets_to_replicate)[offset_index] - (*offsets_to_replicate)[offset_index - 1]; + auto row = offset_remain_rows == 0 ? current_offset_row : offset_remain_rows; + if (row + replicate_rows > max_block_size) + { + size_t remain_rows = max_block_size - replicate_rows; + replicate_rows += remain_rows; + tmp_offsets_to_replicate.emplace_back(replicate_rows); + offset_remain_rows = row - remain_rows; + } + else + { + offset_remain_rows = 0; + replicate_rows += row; + tmp_offsets_to_replicate.emplace_back(replicate_rows); + offset_index ++; + } + } + Block res = block.cloneEmpty(); + std::set visited_pos; + // block that generates output + for (const auto & i : need_replicate_pos) + { + visited_pos.emplace(i); + res.safeGetByPosition(i).column = + block.safeGetByPosition(i).column->cut(start_offset,tmp_offsets_to_replicate.size())->replicate(tmp_offsets_to_replicate); + } + // Additional splitting is required for columns that do not need to be replicated. + if (!added_columns) + { + for (size_t i = 0; i < block.columns(); ++i) + { + if (!visited_pos.contains(i)) + res.safeGetByPosition(i).column = + block.safeGetByPosition(i).column->cut(output_rows,replicate_rows); + } + } + else + { + auto columns = added_columns->getColumns(output_rows, replicate_rows); + size_t i = 0; + for (const auto & idx : right_col_idx) + { + res.safeGetByPosition(idx).column = std::move(columns[i].column); + i++; + } + } + output_rows += replicate_rows; + if (offset_index >= offsets_to_replicate->size()) finished.store(true); + return res; +} +StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_, std::vector right_col_idx_, std::shared_ptr added_columns_) : block(block_), right_col_idx(right_col_idx_), added_columns(added_columns_) +{ +} +StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_) : block(block_) +{ +} + template struct JoinFeatures { @@ -1278,8 +1422,8 @@ void addFoundRowAll( KnownRowsHolder & known_rows [[maybe_unused]], JoinStuff::JoinUsedFlags * used_flags [[maybe_unused]]) { - if constexpr (add_missing) - added.applyLazyDefaults(); +// if constexpr (add_missing) +// added.applyLazyDefaults(); if constexpr (multiple_disjuncts) { @@ -1289,7 +1433,7 @@ void addFoundRowAll( { if (!known_rows.isKnown(std::make_pair(it->block, it->row_num))) { - added.appendFromBlock(*it->block, it->row_num); + added.appendFromBlock(*it->block, it->row_num); ++current_offset; if (!new_known_rows_ptr) { @@ -1313,7 +1457,7 @@ void addFoundRowAll( { for (auto it = mapped.begin(); it.ok(); ++it) { - added.appendFromBlock(*it->block, it->row_num); + added.appendFromBlock(*it->block, it->row_num); ++current_offset; } } @@ -1392,7 +1536,7 @@ NO_INLINE IColumn::Filter joinRightColumns( else used_flags.template setUsed(find_result); - added_columns.appendFromBlock(*row_ref.block, row_ref.row_num); + added_columns.appendFromBlock(*row_ref.block, row_ref.row_num); } else addNotFoundRow(added_columns, current_offset); @@ -1423,7 +1567,7 @@ NO_INLINE IColumn::Filter joinRightColumns( if (used_once) { setUsed(filter, i); - added_columns.appendFromBlock(*mapped.block, mapped.row_num); + added_columns.appendFromBlock(*mapped.block, mapped.row_num); } break; @@ -1441,7 +1585,7 @@ NO_INLINE IColumn::Filter joinRightColumns( { setUsed(filter, i); used_flags.template setUsed(find_result); - added_columns.appendFromBlock(*mapped.block, mapped.row_num); + added_columns.appendFromBlock(*mapped.block, mapped.row_num); if (join_features.is_any_or_semi_join) { @@ -1464,7 +1608,7 @@ NO_INLINE IColumn::Filter joinRightColumns( } } - added_columns.applyLazyDefaults(); +// added_columns.applyLazyDefaults(); return filter; } @@ -1581,7 +1725,7 @@ IBlocksStreamPtr HashJoin::joinBlockImpl( * but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped. * For ASOF, the last column is used as the ASOF column */ - AddedColumns added_columns( + std::shared_ptr added_columns = std::make_shared( block_with_columns_to_add, block, savedBlockSample(), @@ -1591,12 +1735,16 @@ IBlocksStreamPtr HashJoin::joinBlockImpl( is_join_get); bool has_required_right_keys = (required_right_keys.columns() != 0); - added_columns.need_filter = join_features.need_filter || has_required_right_keys; + added_columns->need_filter = join_features.need_filter || has_required_right_keys; - IColumn::Filter row_filter = switchJoinRightColumns(maps_, added_columns, data->type, used_flags); + IColumn::Filter row_filter = switchJoinRightColumns(maps_, *added_columns, data->type, used_flags); - for (size_t i = 0; i < added_columns.size(); ++i) - block.insert(added_columns.moveColumn(i)); + std::vector right_col_idx; + for (size_t i = 0; i < added_columns->size(); ++i) + { + block.insert(added_columns->moveColumn(i)); + right_col_idx.emplace_back(existing_columns + i); + } std::vector right_keys_to_replicate [[maybe_unused]]; @@ -1670,16 +1818,16 @@ IBlocksStreamPtr HashJoin::joinBlockImpl( if constexpr (join_features.need_replication) { - std::unique_ptr & offsets_to_replicate = added_columns.offsets_to_replicate; + std::unique_ptr & offsets_to_replicate = added_columns->offsets_to_replicate; std::vector need_replicate_columns; for (size_t i = 0; i < existing_columns; ++i) need_replicate_columns.emplace_back(i); need_replicate_columns.insert(need_replicate_columns.end(), right_keys_to_replicate.begin(), right_keys_to_replicate.end()); - return std::make_shared(block, std::move(offsets_to_replicate), need_replicate_columns, table_join->maxJoinedBlockRows()); + return std::make_shared(block, right_col_idx, added_columns, std::move(offsets_to_replicate), need_replicate_columns, table_join->maxJoinedBlockRows()); } else { - return std::make_shared(block); + return std::make_shared(block, right_col_idx, added_columns); } } diff --git a/src/Interpreters/HashJoin.h b/src/Interpreters/HashJoin.h index 45ffaa40e766..d2ec00da3fd2 100644 --- a/src/Interpreters/HashJoin.h +++ b/src/Interpreters/HashJoin.h @@ -449,8 +449,6 @@ class HashJoin : public IJoin bool shrink_blocks = false; Int64 memory_usage_before_adding_blocks = 0; - std::shared_ptr current_result; - Poco::Logger * log; /// Should be set via setLock to protect hash table from modification from StorageJoin diff --git a/src/Interpreters/JoinUtils.cpp b/src/Interpreters/JoinUtils.cpp index 7addf9984c86..f20c99ad6e17 100644 --- a/src/Interpreters/JoinUtils.cpp +++ b/src/Interpreters/JoinUtils.cpp @@ -871,73 +871,4 @@ Block NotJoinedBlocks::nextImpl() return result_block; } -StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_, std::unique_ptr offsets_to_replicate_, const std::vector & need_replicate_pos_, size_t max_block_size_) - : block(block_), offsets_to_replicate(std::move(offsets_to_replicate_)), need_replicate_pos(need_replicate_pos_), max_block_size(max_block_size_) -{ -} - -Block StreamReplicateBlocks::nextImpl() -{ - if (need_replicate_pos.empty() || !offsets_to_replicate) - { - finished.store(true); - return block; - } - if (offsets_to_replicate->back() < DEFAULT_BLOCK_SIZE) - { - finished.store(true); - for (const auto & i : need_replicate_pos) - { - block.safeGetByPosition(i).column = - block.safeGetByPosition(i).column->replicate(*offsets_to_replicate); - } - return block; - } - IColumn::Offsets tmp_offsets_to_replicate; - size_t replicate_rows = 0; - size_t start_offset = offset_index; - // Split offsets_to_replicate according to max_block_size - while (offset_index < offsets_to_replicate->size() && replicate_rows < max_block_size) - { - auto current_offset_row = offset_index == 0 ? (*offsets_to_replicate)[offset_index] : (*offsets_to_replicate)[offset_index] - (*offsets_to_replicate)[offset_index - 1]; - auto row = offset_remain_rows == 0 ? current_offset_row : offset_remain_rows; - if (row + replicate_rows > max_block_size) - { - size_t remain_rows = max_block_size - replicate_rows; - replicate_rows += remain_rows; - tmp_offsets_to_replicate.emplace_back(replicate_rows); - offset_remain_rows = row - remain_rows; - } - else - { - offset_remain_rows = 0; - replicate_rows += row; - tmp_offsets_to_replicate.emplace_back(replicate_rows); - offset_index ++; - } - } - Block res = block.cloneEmpty(); - std::set visited_pos; - // block that generates output - for (const auto & i : need_replicate_pos) - { - visited_pos.emplace(i); - res.safeGetByPosition(i).column = - block.safeGetByPosition(i).column->cut(start_offset,tmp_offsets_to_replicate.size())->replicate(tmp_offsets_to_replicate); - } - // Additional splitting is required for columns that do not need to be replicated. - for (size_t i = 0; i < block.columns(); ++i) - { - if (!visited_pos.contains(i)) - res.safeGetByPosition(i).column = - block.safeGetByPosition(i).column->cut(output_rows,replicate_rows); - } - output_rows += replicate_rows; - if (offset_index >= offsets_to_replicate->size()) finished.store(true); - return res; -} -StreamReplicateBlocks::StreamReplicateBlocks(const Block & block_) : block(block_) -{ -} - } diff --git a/src/Interpreters/JoinUtils.h b/src/Interpreters/JoinUtils.h index 3ce1c5ea6409..f112ca22e5b7 100644 --- a/src/Interpreters/JoinUtils.h +++ b/src/Interpreters/JoinUtils.h @@ -172,23 +172,4 @@ class NotJoinedBlocks final : public IBlocksStream void setRightIndex(size_t right_pos, size_t result_position); }; -class StreamReplicateBlocks final: public IBlocksStream -{ -public: - explicit StreamReplicateBlocks(const Block & block); - StreamReplicateBlocks(const Block & block, std::unique_ptr offsets_to_replicate, const std::vector & need_replicate_pos, size_t max_block_size); - -protected: - Block nextImpl() override; - -private: - Block block; - std::unique_ptr offsets_to_replicate; - std::vector need_replicate_pos; - size_t offset_index = 0; - size_t offset_remain_rows = 0; - size_t output_rows = 0; - size_t max_block_size = DEFAULT_BLOCK_SIZE; -}; - }