From 9f4fc328bb9026ef451bbba0313d29e96e55873a Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Tue, 20 Apr 2021 21:15:12 +0800 Subject: [PATCH 1/2] [Enhancement] Reduce memory consumption by release readers earier We created multiple rowset readers to read data of one tablet, after one rowset reader has reached EOF, it can be released to reduce resource (typicallly memory) comsumption. As the same, we can release segment reader when it reach EOF. --- be/src/olap/collect_iterator.cpp | 82 +++++++------ be/src/olap/collect_iterator.h | 45 ++++--- be/src/olap/compaction.cpp | 12 +- be/src/olap/generic_iterators.cpp | 74 +++++++----- be/src/olap/generic_iterators.h | 4 +- be/src/olap/merger.cpp | 1 + be/src/olap/reader.cpp | 64 +++++----- be/src/olap/reader.h | 5 +- be/src/olap/row_block.h | 2 + be/src/olap/row_cursor.cpp | 1 + be/src/olap/rowset/alpha_rowset_reader.cpp | 112 ++++++++---------- be/src/olap/rowset/alpha_rowset_reader.h | 12 +- be/src/olap/rowset/beta_rowset_reader.cpp | 2 +- .../olap/rowset/unique_rowset_id_generator.h | 1 + be/src/olap/storage_engine.cpp | 1 - be/test/olap/generic_iterators_test.cpp | 4 +- 16 files changed, 221 insertions(+), 201 deletions(-) diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp index cf9fdbedf4b610..5fce312cc1d0f6 100644 --- a/be/src/olap/collect_iterator.cpp +++ b/be/src/olap/collect_iterator.cpp @@ -44,55 +44,62 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { return OLAP_ERR_DATA_EOF; } - LevelIterator* child_ptr = child.release(); - _children.push_back(child_ptr); + _children.push_back(child.release()); return OLAP_SUCCESS; } // Build a merge heap. If _merge is true, a rowset with the max rownum // status will be used as the base rowset, and the other rowsets will be merged first and // then merged with the base rowset. -void CollectIterator::build_heap() { - DCHECK(_reader->_rs_readers.size() == _children.size()); +void CollectIterator::build_heap(const std::vector& rs_readers) { + DCHECK(rs_readers.size() == _children.size()); _reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS; if (_children.empty()) { _inner_iter.reset(nullptr); return; } else if (_merge) { - DCHECK(!_reader->_rs_readers.empty()); - // build merge heap with two children, a base rowset as level0iterator and + DCHECK(!rs_readers.empty()); + // build merge heap with two children, a base rowset as level0iterator and // other cumulative rowsets as a level1iterator if (_children.size() > 1) { - // find base rowset(max rownum), - RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0]; + // find 'base rowset', 'base rowset' is the rowset which contains the max row number + int64_t max_row_num = 0; int base_reader_idx = 0; - for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) { - if (_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() > - base_reader->rowset()->rowset_meta()->num_rows()) { - base_reader = _reader->_rs_readers[i]; + for (size_t i = 0; i < rs_readers.size(); ++i) { + int64_t cur_row_num = rs_readers[i]->rowset()->rowset_meta()->num_rows(); + if (cur_row_num > max_row_num) { + max_row_num = cur_row_num; base_reader_idx = i; } } - std::vector cumu_children; - for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) { + auto base_reader_child = _children.begin(); + std::advance(base_reader_child, base_reader_idx); + + std::list cumu_children; + int i = 0; + for (const auto& child : _children) { if (i != base_reader_idx) { - cumu_children.push_back(_children[i]); + cumu_children.push_back(child); } + ++i; } Level1Iterator* cumu_iter = new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse); cumu_iter->init(); - std::vector children; - children.push_back(_children[base_reader_idx]); + std::list children; + children.push_back(*base_reader_child); children.push_back(cumu_iter); _inner_iter.reset(new Level1Iterator(children, _merge, _reverse)); } else { + // _children.size() == 1 _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); } } else { _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); } _inner_iter->init(); + // Clear _children earlier to release any related references + _children.clear(); } bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a, @@ -114,16 +121,6 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a return a->version() > b->version(); } -void CollectIterator::clear() { - for (auto child : _children) { - if (child != nullptr) { - delete child; - child = nullptr; - } - } - _children.clear(); -} - const RowCursor* CollectIterator::current_row(bool* delete_flag) const { if (LIKELY(_inner_iter)) { return _inner_iter->current_row(delete_flag); @@ -230,7 +227,7 @@ OLAPStatus CollectIterator::Level0Iterator::next(const RowCursor** row, bool* de } CollectIterator::Level1Iterator::Level1Iterator( - const std::vector& children, bool merge, bool reverse) + const std::list& children, bool merge, bool reverse) : _children(children), _merge(merge), _reverse(reverse) {} CollectIterator::LevelIterator::~LevelIterator() {} @@ -250,7 +247,7 @@ CollectIterator::Level1Iterator::~Level1Iterator() { // OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. // Others when error happens OLAPStatus CollectIterator::Level1Iterator::next(const RowCursor** row, bool* delete_flag) { - if (UNLIKELY(_children.size() == 0)) { + if (UNLIKELY(_cur_child == nullptr)) { return OLAP_ERR_DATA_EOF; } if (_merge) { @@ -284,23 +281,25 @@ int64_t CollectIterator::Level1Iterator::version() const { } OLAPStatus CollectIterator::Level1Iterator::init() { - if (_children.size() == 0) { + if (_children.empty()) { return OLAP_SUCCESS; } + // Only when there are multiple children that need to be merged if (_merge && _children.size() > 1) { _heap.reset(new MergeHeap(LevelIteratorComparator(_reverse))); for (auto child : _children) { - if (child == nullptr || child->current_row() == nullptr) { - continue; - } + DCHECK(child != nullptr); + DCHECK(child->current_row() != nullptr); _heap->push(child); - _cur_child = _heap->top(); } + _cur_child = _heap->top(); + // Clear _children earlier to release any related references + _children.clear(); } else { _merge = false; _heap.reset(nullptr); - _cur_child = _children[_child_idx]; + _cur_child = *(_children.begin()); } return OLAP_SUCCESS; } @@ -313,6 +312,8 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor** _heap->push(_cur_child); _cur_child = _heap->top(); } else if (res == OLAP_ERR_DATA_EOF) { + // current child has been read, to read next + delete _cur_child; if (!_heap->empty()) { _cur_child = _heap->top(); } else { @@ -320,6 +321,7 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor** return OLAP_ERR_DATA_EOF; } } else { + _cur_child = nullptr; LOG(WARNING) << "failed to get next from child, res=" << res; return res; } @@ -333,10 +335,11 @@ inline OLAPStatus CollectIterator::Level1Iterator::_normal_next(const RowCursor* if (LIKELY(res == OLAP_SUCCESS)) { return OLAP_SUCCESS; } else if (res == OLAP_ERR_DATA_EOF) { - // this child has been read, to read next - _child_idx++; - if (_child_idx < _children.size()) { - _cur_child = _children[_child_idx]; + // current child has been read, to read next + delete _cur_child; + _children.pop_front(); + if (!_children.empty()) { + _cur_child = *(_children.begin()); *row = _cur_child->current_row(delete_flag); return OLAP_SUCCESS; } else { @@ -344,6 +347,7 @@ inline OLAPStatus CollectIterator::Level1Iterator::_normal_next(const RowCursor* return OLAP_ERR_DATA_EOF; } } else { + _cur_child = nullptr; LOG(WARNING) << "failed to get next from child, res=" << res; return res; } diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h index 1ed0831c5c6d82..92f3748eaafc8d 100644 --- a/be/src/olap/collect_iterator.h +++ b/be/src/olap/collect_iterator.h @@ -35,7 +35,7 @@ class CollectIterator { OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); - void build_heap(); + void build_heap(const std::vector& rs_readers); // Get top row of the heap, nullptr if reach end. const RowCursor* current_row(bool* delete_flag) const; @@ -47,9 +47,6 @@ class CollectIterator { // Others when error happens OLAPStatus next(const RowCursor** row, bool* delete_flag); - // Clear the MergeSet element and reset state. - void clear(); - private: // This interface is the actual implementation of the new version of iterator. // It currently contains two implementations, one is Level0Iterator, @@ -70,6 +67,7 @@ class CollectIterator { virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0; virtual ~LevelIterator() = 0; }; + // Compare row cursors between multiple merge elements, // if row cursors equal, compare data version. class LevelIteratorComparator { @@ -79,7 +77,6 @@ class CollectIterator { private: bool _reverse; - OlapReaderStatistics* _stats; }; typedef std::priority_queue, @@ -90,15 +87,15 @@ class CollectIterator { public: Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader); - OLAPStatus init(); + OLAPStatus init() override; - const RowCursor* current_row(bool* delete_flag) const; + const RowCursor* current_row(bool* delete_flag) const override; - const RowCursor* current_row() const; + const RowCursor* current_row() const override; - int64_t version() const; + int64_t version() const override; - OLAPStatus next(const RowCursor** row, bool* delete_flag); + OLAPStatus next(const RowCursor** row, bool* delete_flag) override; ~Level0Iterator(); @@ -109,27 +106,27 @@ class CollectIterator { OLAPStatus _refresh_current_row_v2(); RowsetReaderSharedPtr _rs_reader; - const RowCursor* _current_row = nullptr; + const RowCursor* _current_row = nullptr; // It points to the returned row bool _is_delete = false; Reader* _reader = nullptr; - // point to rows inside `_row_block` - RowCursor _row_cursor; + RowCursor _row_cursor; // It points to rows inside `_row_block`, maybe not returned RowBlock* _row_block = nullptr; }; + // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed) class Level1Iterator : public LevelIterator { public: - Level1Iterator(const std::vector& children, bool merge, bool reverse); + Level1Iterator(const std::list& children, bool merge, bool reverse); - OLAPStatus init(); + OLAPStatus init() override; - const RowCursor* current_row(bool* delete_flag) const; + const RowCursor* current_row(bool* delete_flag) const override; - const RowCursor* current_row() const; + const RowCursor* current_row() const override; - int64_t version() const; + int64_t version() const override; - OLAPStatus next(const RowCursor** row, bool* delete_flag); + OLAPStatus next(const RowCursor** row, bool* delete_flag) override; ~Level1Iterator(); @@ -137,8 +134,9 @@ class CollectIterator { inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag); inline OLAPStatus _normal_next(const RowCursor** row, bool* delete_flag); - // each Level0Iterator corresponds to a rowset reader - const std::vector _children; + // Each LevelIterator corresponds to a rowset reader, + // it will be cleared after '_heap' has been initilized when '_merge == true'. + std::list _children; // point to the Level0Iterator containing the next output row. // null when CollectIterator hasn't been initialized or reaches EOF. LevelIterator* _cur_child = nullptr; @@ -158,8 +156,9 @@ class CollectIterator { std::unique_ptr _inner_iter; - // each LevelIterator corresponds to a rowset reader - std::vector _children; + // Each LevelIterator corresponds to a rowset reader, + // it will be cleared after '_inner_iter' has been initilized. + std::list _children; bool _merge = true; bool _reverse = false; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c567e4a2865382..383ae9ced1df02 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -83,8 +83,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { _tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output_version=" << _output_version.first << "-" << _output_version.second - << ", permits: " << permits; + << ", output_version=" << _output_version << ", permits: " << permits; RETURN_NOT_OK(construct_output_rowset_writer()); RETURN_NOT_OK(construct_input_rowset_readers()); @@ -98,8 +97,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res << ", tablet=" << _tablet->full_name() - << ", output_version=" << _output_version.first << "-" - << _output_version.second; + << ", output_version=" << _output_version; return res; } TRACE("merge rowsets finished"); @@ -109,8 +107,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { _output_rowset = _output_rs_writer->build(); if (_output_rowset == nullptr) { LOG(WARNING) << "rowset writer build failed. writer version:" - << ", output_version=" << _output_version.first << "-" - << _output_version.second; + << ", output_version=" << _output_version; return OLAP_ERR_MALLOC_ERROR; } TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size()); @@ -128,6 +125,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { // 5. update last success compaction time int64_t now = UnixMillis(); + // TODO(yingchun): do the judge in Tablet class if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { _tablet->set_last_cumu_compaction_success_time(now); } else { @@ -141,7 +139,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { } LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output_version=" << _output_version.first << "-" << _output_version.second + << ", output_version=" << _output_version << ", current_max_version=" << current_max_version << ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num << ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy=" diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp index 52bfb6cad8232c..66e97d6ddd3f0a 100644 --- a/be/src/olap/generic_iterators.cpp +++ b/be/src/olap/generic_iterators.cpp @@ -113,8 +113,13 @@ Status AutoIncrementIterator::next_batch(RowBlockV2* block) { // } class MergeIteratorContext { public: - // This class don't take iter's ownership, client should delete it - MergeIteratorContext(RowwiseIterator* iter, std::shared_ptr parent) : _iter(iter), _block(iter->schema(), 1024, std::move(parent)) {} + MergeIteratorContext(RowwiseIterator* iter, std::shared_ptr parent) + : _iter(iter), _block(iter->schema(), 1024, std::move(parent)) {} + + ~MergeIteratorContext() { + delete _iter; + _iter = nullptr; + } // Initialize this context and will prepare data for current_row() Status init(const StorageReadOptions& opts); @@ -199,16 +204,15 @@ Status MergeIteratorContext::_load_next_block() { class MergeIterator : public RowwiseIterator { public: // MergeIterator takes the ownership of input iterators - MergeIterator(std::vector iters, std::shared_ptr parent) : _origin_iters(std::move(iters)) { + MergeIterator(std::list iters, std::shared_ptr parent) : _origin_iters(std::move(iters)) { // use for count the mem use of Block use in Merge _mem_tracker = MemTracker::CreateTracker(-1, "MergeIterator", parent, false); } ~MergeIterator() override { - for (auto iter : _origin_iters) { - delete iter; - } - for (auto ctx : _merge_ctxs) { + while (!_merge_heap->empty()) { + auto ctx = _merge_heap->top(); + _merge_heap->pop(); delete ctx; } } @@ -218,8 +222,8 @@ class MergeIterator : public RowwiseIterator { const Schema& schema() const override { return *_schema; } private: - std::vector _origin_iters; - std::vector _merge_ctxs; + // It will be released after '_merge_heap' has been built. + std::list _origin_iters; std::unique_ptr _schema; @@ -247,18 +251,18 @@ Status MergeIterator::init(const StorageReadOptions& opts) { if (_origin_iters.empty()) { return Status::OK(); } - _schema.reset(new Schema(_origin_iters[0]->schema())); - _merge_heap.reset(new MergeHeap); + _schema.reset(new Schema((*(_origin_iters.begin()))->schema())); + _merge_heap.reset(new MergeHeap); for (auto iter : _origin_iters) { std::unique_ptr ctx(new MergeIteratorContext(iter, _mem_tracker)); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; } - _merge_heap->push(ctx.get()); - _merge_ctxs.push_back(ctx.release()); + _merge_heap->push(ctx.release()); } + _origin_iters.clear(); return Status::OK(); } @@ -279,6 +283,9 @@ Status MergeIterator::next_batch(RowBlockV2* block) { RETURN_IF_ERROR(ctx->advance()); if (ctx->valid()) { _merge_heap->push(ctx); + } else { + // Release ctx earlier to reduce resource consumed + delete ctx; } } block->set_num_rows(row_idx); @@ -296,7 +303,8 @@ class UnionIterator : public RowwiseIterator { // Iterators' ownership it transfered to this class. // This class will delete all iterators when destructs // Client should not use iterators any more. - UnionIterator(std::vector iters, std::shared_ptr parent) : _origin_iters(std::move(iters)) { + UnionIterator(std::list iters, std::shared_ptr parent) + : _origin_iters(std::move(iters)) { _mem_tracker = MemTracker::CreateTracker(-1, "UnionIterator", parent, false); } @@ -308,46 +316,54 @@ class UnionIterator : public RowwiseIterator { Status init(const StorageReadOptions& opts) override; Status next_batch(RowBlockV2* block) override; - const Schema& schema() const override { return _origin_iters[0]->schema(); } + const Schema& schema() const override { return *_schema; } private: - std::vector _origin_iters; - size_t _iter_idx = 0; + std::unique_ptr _schema; + RowwiseIterator* _cur_iter = nullptr; + std::list _origin_iters; }; Status UnionIterator::init(const StorageReadOptions& opts) { + if (_origin_iters.empty()) { + return Status::OK(); + } + for (auto iter : _origin_iters) { RETURN_IF_ERROR(iter->init(opts)); } + _schema.reset(new Schema((*(_origin_iters.begin()))->schema())); + _cur_iter = *(_origin_iters.begin()); return Status::OK(); } Status UnionIterator::next_batch(RowBlockV2* block) { - if (_iter_idx >= _origin_iters.size()) { - return Status::EndOfFile("End of UnionIterator"); - } - do { - auto iter = _origin_iters[_iter_idx]; - auto st = iter->next_batch(block); + while (_cur_iter != nullptr) { + auto st = _cur_iter->next_batch(block); if (st.is_end_of_file()) { - _iter_idx++; + delete _cur_iter; + _cur_iter = nullptr; + _origin_iters.pop_front(); + if (!_origin_iters.empty()) { + _cur_iter = *(_origin_iters.begin()); + } } else { return st; } - } while (_iter_idx < _origin_iters.size()); + } return Status::EndOfFile("End of UnionIterator"); } -RowwiseIterator* new_merge_iterator(std::vector inputs, std::shared_ptr parent) { +RowwiseIterator* new_merge_iterator(std::list inputs, std::shared_ptr parent) { if (inputs.size() == 1) { - return inputs[0]; + return *(inputs.begin()); } return new MergeIterator(std::move(inputs), parent); } -RowwiseIterator* new_union_iterator(std::vector inputs, std::shared_ptr parent) { +RowwiseIterator* new_union_iterator(std::list inputs, std::shared_ptr parent) { if (inputs.size() == 1) { - return inputs[0]; + return *(inputs.begin()); } return new UnionIterator(std::move(inputs), parent); } diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h index 9f95d03d1f4a91..139294217bfad2 100644 --- a/be/src/olap/generic_iterators.h +++ b/be/src/olap/generic_iterators.h @@ -25,14 +25,14 @@ namespace doris { // // Inputs iterators' ownership is taken by created merge iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_merge_iterator(std::vector inputs, std::shared_ptr parent); +RowwiseIterator* new_merge_iterator(std::list inputs, std::shared_ptr parent); // Create a union iterator for input iterators. Union iterator will read // input iterators one by one. // // Inputs iterators' ownership is taken by created union iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_union_iterator(std::vector inputs, std::shared_ptr parent); +RowwiseIterator* new_union_iterator(std::list inputs, std::shared_ptr parent); // Create an auto increment iterator which returns num_rows data in format of schema. // This class aims to be used in unit test. diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 932dcf60853cbe..69301ea06ee0d3 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -48,6 +48,7 @@ OLAPStatus Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, "failed to init row cursor when merging rowsets of tablet " + tablet->full_name()); row_cursor.allocate_memory_for_string_type(tablet->tablet_schema()); + // TODO(yingchun): monitor std::shared_ptr tracker(new MemTracker(-1)); std::unique_ptr mem_pool(new MemPool(tracker.get())); diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 39081eee719f73..ce88f23e4ca19c 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -102,6 +102,7 @@ Reader::~Reader() { } OLAPStatus Reader::init(const ReaderParams& read_params) { + // TODO(yingchun): monitor _tracker.reset(new MemTracker(-1, read_params.tablet->full_name())); _predicate_mem_pool.reset(new MemPool(_tracker.get())); @@ -115,7 +116,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return res; } - res = _capture_rs_readers(read_params); + std::vector rs_readers; + res = _capture_rs_readers(read_params, &rs_readers); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res << ", tablet_id:" << read_params.tablet->tablet_id() @@ -125,11 +127,36 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return res; } - // When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation + if (_optimize_for_single_rowset(rs_readers)) { + _next_row_func = _tablet->keys_type() == AGG_KEYS ? &Reader::_direct_agg_key_next_row + : &Reader::_direct_next_row; + return OLAP_SUCCESS; + } + + switch (_tablet->keys_type()) { + case KeysType::DUP_KEYS: + _next_row_func = &Reader::_direct_next_row; + break; + case KeysType::UNIQUE_KEYS: + _next_row_func = &Reader::_unique_key_next_row; + break; + case KeysType::AGG_KEYS: + _next_row_func = &Reader::_agg_key_next_row; + break; + default: + DCHECK(false) << "No next row function for type:" << _tablet->keys_type(); + break; + } + + return OLAP_SUCCESS; +} + +// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation +bool Reader::_optimize_for_single_rowset(const std::vector& rs_readers) { bool has_delete_rowset = false; bool has_overlapping = false; int nonoverlapping_count = 0; - for (auto rs_reader : _rs_readers) { + for (const auto& rs_reader : rs_readers) { if (rs_reader->rowset()->rowset_meta()->delete_flag()) { has_delete_rowset = true; break; @@ -144,27 +171,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { } } } - if (!has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset) { - _next_row_func = _tablet->keys_type() == AGG_KEYS ? &Reader::_direct_agg_key_next_row - : &Reader::_direct_next_row; - } else { - switch (_tablet->keys_type()) { - case KeysType::DUP_KEYS: - _next_row_func = &Reader::_direct_next_row; - break; - case KeysType::UNIQUE_KEYS: - _next_row_func = &Reader::_unique_key_next_row; - break; - case KeysType::AGG_KEYS: - _next_row_func = &Reader::_agg_key_next_row; - break; - default: - break; - } - } - DCHECK(_next_row_func != nullptr) << "No next row function for type:" << _tablet->keys_type(); - return OLAP_SUCCESS; + return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset; } OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, @@ -180,6 +188,7 @@ OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, Ob } return OLAP_SUCCESS; } + OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) { if (UNLIKELY(_next_key == nullptr)) { @@ -188,7 +197,7 @@ OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_ } init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool); auto res = _collect_iter->next(&_next_key, &_next_delete_flag); - if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { + if (UNLIKELY(res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF)) { return res; } if (_need_agg_finalize) { @@ -301,7 +310,8 @@ void Reader::close() { } } -OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { +OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params, + std::vector* valid_rs_readers) { const std::vector* rs_readers = &read_params.rs_readers; if (rs_readers->empty()) { LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name(); @@ -399,10 +409,10 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { return res; } if (res == OLAP_SUCCESS) { - _rs_readers.push_back(rs_reader); + valid_rs_readers->push_back(rs_reader); } } - _collect_iter->build_heap(); + _collect_iter->build_heap(*valid_rs_readers); _next_key = _collect_iter->current_row(&_next_delete_flag); return OLAP_SUCCESS; } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 788e48cb76ddd3..766f8ec4f054d0 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -124,7 +124,9 @@ class Reader { OLAPStatus _init_params(const ReaderParams& read_params); - OLAPStatus _capture_rs_readers(const ReaderParams& read_params); + OLAPStatus _capture_rs_readers(const ReaderParams& read_params, std::vector* valid_rs_readers); + + bool _optimize_for_single_rowset(const std::vector& rs_readers); OLAPStatus _init_keys_param(const ReaderParams& read_params); @@ -174,7 +176,6 @@ class Reader { std::vector _seek_columns; TabletSharedPtr _tablet; - std::vector _rs_readers; RowsetReaderContext _reader_context; KeysParam _keys_param; std::vector _is_lower_keys_included; diff --git a/be/src/olap/row_block.h b/be/src/olap/row_block.h index b44a55014e309f..75924fa63228de 100644 --- a/be/src/olap/row_block.h +++ b/be/src/olap/row_block.h @@ -72,6 +72,8 @@ class RowBlock { cursor->attach(_mem_buf + row_index * _mem_row_bytes); } + // TODO(yingchun): why not use _pos directly? + template inline void set_row(uint32_t row_index, const RowType& row) const { memcpy(_mem_buf + row_index * _mem_row_bytes, row.row_ptr(), _mem_row_bytes); diff --git a/be/src/olap/row_cursor.cpp b/be/src/olap/row_cursor.cpp index ca2f690f5063fd..e76d9bea40cc1d 100644 --- a/be/src/olap/row_cursor.cpp +++ b/be/src/olap/row_cursor.cpp @@ -165,6 +165,7 @@ OLAPStatus RowCursor::init_scan_key(const TabletSchema& schema, return OLAP_SUCCESS; } +// TODO(yingchun): parameter 'const TabletSchema& schema' is not used OLAPStatus RowCursor::allocate_memory_for_string_type(const TabletSchema& schema) { // allocate memory for string type(char, varchar, hll) // The memory allocated in this function is used in aggregate and copy function diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp b/be/src/olap/rowset/alpha_rowset_reader.cpp index 1abc6dafef5ec1..9a72daee880137 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.cpp +++ b/be/src/olap/rowset/alpha_rowset_reader.cpp @@ -37,6 +37,15 @@ AlphaRowsetReader::AlphaRowsetReader(int num_rows_per_row_block, AlphaRowsetShar AlphaRowsetReader::~AlphaRowsetReader() { delete _dst_cursor; _rowset->release(); + while (!_merge_heap.empty()) { + auto ctx = _merge_heap.top(); + _merge_heap.pop(); + delete ctx; + } + for (auto ctx : _sequential_ctxs) { + delete ctx; + } + _sequential_ctxs.clear(); } OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { @@ -50,15 +59,14 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { } _is_segments_overlapping = _alpha_rowset_meta->is_segments_overlapping(); - _ordinal = 0; RETURN_NOT_OK(_init_merge_ctxs(read_context)); // needs to sort merge only when // 1) we are told to return sorted result (need_ordered_result) - // 2) we have several segment groups (_is_segments_overlapping && _merge_ctxs.size() > 1) + // 2) we have several segment groups (_is_segments_overlapping && _sequential_ctxs.size() > 1) if (_current_read_context->need_ordered_result && _is_segments_overlapping && - _merge_ctxs.size() > 1) { + _sequential_ctxs.size() > 1) { _next_block = &AlphaRowsetReader::_merge_block; _read_block.reset(new (std::nothrow) RowBlock(_current_read_context->tablet_schema, _parent_tracker)); @@ -79,22 +87,23 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { // Upon rollup/alter table, seek_columns is nullptr. // Under this circumstance, init RowCursor with all columns. _dst_cursor->init(*(_current_read_context->tablet_schema)); - for (size_t i = 0; i < _merge_ctxs.size(); ++i) { - _merge_ctxs[i].row_cursor.reset(new (std::nothrow) RowCursor()); - _merge_ctxs[i].row_cursor->init(*(_current_read_context->tablet_schema)); + for (auto ctx : _sequential_ctxs) { + ctx->row_cursor.reset(new (std::nothrow) RowCursor()); + ctx->row_cursor->init(*(_current_read_context->tablet_schema)); } } else { _dst_cursor->init(*(_current_read_context->tablet_schema), *(_current_read_context->seek_columns)); - for (size_t i = 0; i < _merge_ctxs.size(); ++i) { - _merge_ctxs[i].row_cursor.reset(new (std::nothrow) RowCursor()); - _merge_ctxs[i].row_cursor->init(*(_current_read_context->tablet_schema), - *(_current_read_context->seek_columns)); + for (auto ctx : _sequential_ctxs) { + ctx->row_cursor.reset(new (std::nothrow) RowCursor()); + ctx->row_cursor->init(*(_current_read_context->tablet_schema), + *(_current_read_context->seek_columns)); } } RETURN_NOT_OK(_init_merge_heap()); } else { _next_block = &AlphaRowsetReader::_union_block; + _cur_ctx = *(_sequential_ctxs.begin()); } return OLAP_SUCCESS; } @@ -120,20 +129,24 @@ int64_t AlphaRowsetReader::filtered_rows() { } OLAPStatus AlphaRowsetReader::_union_block(RowBlock** block) { - while (_ordinal < _merge_ctxs.size()) { + while (_cur_ctx != nullptr) { // union block only use one block to store - OLAPStatus status = _pull_next_block(&(_merge_ctxs[_ordinal])); + OLAPStatus status = _pull_next_block(_cur_ctx); if (status == OLAP_ERR_DATA_EOF) { - _ordinal++; - continue; + delete _cur_ctx; + _cur_ctx = nullptr; + _sequential_ctxs.pop_front(); + if (!_sequential_ctxs.empty()) { + _cur_ctx = *(_sequential_ctxs.begin()); + } } else if (status != OLAP_SUCCESS) { return status; } else { - (*block) = _merge_ctxs[_ordinal].row_block; + (*block) = _cur_ctx->row_block; return OLAP_SUCCESS; } } - if (_ordinal == _merge_ctxs.size()) { + if (_sequential_ctxs.empty()) { *block = nullptr; return OLAP_ERR_DATA_EOF; } @@ -148,6 +161,7 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { _read_block->clear(); size_t num_rows_in_block = 0; while (_read_block->pos() < _num_rows_per_row_block) { + // 1. Read one row from heap RowCursor* row_cursor = nullptr; status = _pull_next_row_for_merge_rowset_v2(&row_cursor); if (status == OLAP_ERR_DATA_EOF && _read_block->pos() > 0) { @@ -159,11 +173,13 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { VLOG_TRACE << "get merged row: " << row_cursor->to_string(); + // 2. Copy the row to buffer block _read_block->get_row(_read_block->pos(), _dst_cursor); copy_row(_dst_cursor, *row_cursor, _read_block->mem_pool()); _read_block->pos_inc(); num_rows_in_block++; + // 3. Adjust heap // MergeHeap should advance one step after row been read. // This function must be called after copy_row // Otherwise, the row has read will be modified instantly before handled. @@ -174,6 +190,7 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { // The returned row will be (2, 2) instead of (1, 1) AlphaMergeContext* merge_ctx = _merge_heap.top(); _merge_heap.pop(); + // merge_ctx will not be pushed back into heap if it is EOF RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(merge_ctx)); } _read_block->set_pos(0); @@ -184,17 +201,19 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { } OLAPStatus AlphaRowsetReader::_init_merge_heap() { - if (_merge_heap.empty() && !_merge_ctxs.empty()) { - for (auto& merge_ctx : _merge_ctxs) { - RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(&merge_ctx)); - } + DCHECK(_merge_heap.empty()); + DCHECK(!_sequential_ctxs.empty()); + for (auto merge_ctx : _sequential_ctxs) { + RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(merge_ctx)); } + _sequential_ctxs.clear(); return OLAP_SUCCESS; } OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx) { - if (merge_ctx->is_eof) { - // nothing in this merge ctx, just return + if (OLAP_UNLIKELY(merge_ctx->is_eof)) { + // nothing in this merge ctx, release and return + delete merge_ctx; return OLAP_SUCCESS; } @@ -202,9 +221,11 @@ OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeC if (merge_ctx->row_block == nullptr || !merge_ctx->row_block->has_remaining()) { OLAPStatus status = _pull_next_block(merge_ctx); if (status == OLAP_ERR_DATA_EOF) { - merge_ctx->is_eof = true; + // nothing in this merge ctx, release and return + delete merge_ctx; return OLAP_SUCCESS; } else if (status != OLAP_SUCCESS) { + delete merge_ctx; LOG(WARNING) << "read next row of singleton rowset failed:" << status; return status; } @@ -221,7 +242,7 @@ OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeC OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset_v2(RowCursor** row) { // if _merge_heap is not empty, return the row at top, and insert a new row // from corresponding merge_ctx - if (!_merge_heap.empty()) { + if (OLAP_LIKELY(!_merge_heap.empty())) { AlphaMergeContext* merge_ctx = _merge_heap.top(); *row = merge_ctx->row_cursor.get(); // Must not rebuild merge_heap in this place. @@ -234,39 +255,6 @@ OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset_v2(RowCursor** row } } -OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset(RowCursor** row) { - RowCursor* min_row = nullptr; - int min_index = -1; - - size_t ordinal = 0; - while (ordinal < _merge_ctxs.size()) { - AlphaMergeContext* merge_ctx = &(_merge_ctxs[ordinal]); - if (merge_ctx->row_block == nullptr || !merge_ctx->row_block->has_remaining()) { - OLAPStatus status = _pull_next_block(merge_ctx); - if (status == OLAP_ERR_DATA_EOF) { - _merge_ctxs.erase(_merge_ctxs.begin() + ordinal); - continue; - } else if (status != OLAP_SUCCESS) { - LOG(WARNING) << "read next row of singleton rowset failed:" << status; - return status; - } - } - RowCursor* current_row = merge_ctx->row_cursor.get(); - merge_ctx->row_block->get_row(merge_ctx->row_block->pos(), current_row); - if (min_row == nullptr || compare_row(*min_row, *current_row) > 0) { - min_row = current_row; - min_index = ordinal; - } - ordinal++; - } - if (min_row == nullptr || min_index == -1) { - return OLAP_ERR_DATA_EOF; - } - *row = min_row; - _merge_ctxs[min_index].row_block->pos_inc(); - return OLAP_SUCCESS; -} - OLAPStatus AlphaRowsetReader::_pull_next_block(AlphaMergeContext* merge_ctx) { OLAPStatus status = OLAP_SUCCESS; if (OLAP_UNLIKELY(merge_ctx->first_read_symbol)) { @@ -394,14 +382,14 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context << new_column_data->version(); new_column_data->set_delete_status(DEL_NOT_SATISFIED); } - AlphaMergeContext merge_ctx; - merge_ctx.column_data = std::move(new_column_data); - _merge_ctxs.emplace_back(std::move(merge_ctx)); + auto merge_ctx = new AlphaMergeContext(); + merge_ctx->column_data = std::move(new_column_data); + _sequential_ctxs.emplace_back(merge_ctx); } - if (!_is_segments_overlapping && _merge_ctxs.size() > 1) { + if (!_is_segments_overlapping && _sequential_ctxs.size() > 1) { LOG(WARNING) << "invalid column_data for cumulative rowset. column_data size:" - << _merge_ctxs.size(); + << _sequential_ctxs.size(); return OLAP_ERR_READER_READING_ERROR; } return OLAP_SUCCESS; diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h index 53f1aef969e324..a8e22155645032 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.h +++ b/be/src/olap/rowset/alpha_rowset_reader.h @@ -80,7 +80,6 @@ class AlphaRowsetReader : public RowsetReader { OLAPStatus _union_block(RowBlock** block); OLAPStatus _merge_block(RowBlock** block); - OLAPStatus _pull_next_row_for_merge_rowset(RowCursor** row); OLAPStatus _pull_next_block(AlphaMergeContext* merge_ctx); // Doris will split query predicates to several scan keys @@ -89,8 +88,6 @@ class AlphaRowsetReader : public RowsetReader { OLAPStatus _pull_first_block(AlphaMergeContext* merge_ctx); // merge by priority queue(_merge_heap) - // this method has same function with _pull_next_row_for_merge_rowset, but using heap merge. - // and this should replace the _pull_next_row_for_merge_rowset later. OLAPStatus _pull_next_row_for_merge_rowset_v2(RowCursor** row); // init the merge heap, this should be call before calling _pull_next_row_for_merge_rowset_v2(); OLAPStatus _init_merge_heap(); @@ -108,7 +105,10 @@ class AlphaRowsetReader : public RowsetReader { AlphaRowsetMeta* _alpha_rowset_meta; const std::vector>& _segment_groups; - std::vector _merge_ctxs; + // In '_union_block' mode, it has items to traverse. + // In '_merge_block' mode, it will be cleared after '_merge_heap' has been built. + std::list _sequential_ctxs; + std::unique_ptr _read_block; OLAPStatus (AlphaRowsetReader::*_next_block)(RowBlock** block) = nullptr; RowCursor* _dst_cursor = nullptr; @@ -119,8 +119,8 @@ class AlphaRowsetReader : public RowsetReader { // into consideration deliberately. bool _is_segments_overlapping; - // ordinal of ColumnData upon reading - size_t _ordinal; + // Current AlphaMergeContext to read data, just valid in '_union_block' mode. + AlphaMergeContext* _cur_ctx = nullptr; RowsetReaderContext* _current_read_context; OlapReaderStatistics _owned_stats; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index cbfecfa2e89e05..d0e74966f06d7a 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -98,7 +98,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { } seg_iterators.push_back(std::move(iter)); } - std::vector iterators; + std::list iterators; for (auto& owned_it : seg_iterators) { // transfer ownership of segment iterator to `_iterator` iterators.push_back(owned_it.release()); diff --git a/be/src/olap/rowset/unique_rowset_id_generator.h b/be/src/olap/rowset/unique_rowset_id_generator.h index 22909bc1133795..593a9155e52a14 100644 --- a/be/src/olap/rowset/unique_rowset_id_generator.h +++ b/be/src/olap/rowset/unique_rowset_id_generator.h @@ -25,6 +25,7 @@ namespace doris { +// TODO(yingchun): why use two classes? class UniqueRowsetIdGenerator : public RowsetIdGenerator { public: UniqueRowsetIdGenerator(const UniqueId& backend_uid); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 46dbf06c8da255..e8a1defe771b34 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -73,7 +73,6 @@ using std::list; using std::map; using std::nothrow; using std::pair; -using std::priority_queue; using std::set; using std::set_difference; using std::string; diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp index e4271b6fd149b1..072d589fddf8d6 100644 --- a/be/test/olap/generic_iterators_test.cpp +++ b/be/test/olap/generic_iterators_test.cpp @@ -77,7 +77,7 @@ TEST(GenericIteratorsTest, AutoIncrement) { TEST(GenericIteratorsTest, Union) { auto schema = create_schema(); - std::vector inputs; + std::list inputs; inputs.push_back(new_auto_increment_iterator(schema, 100)); inputs.push_back(new_auto_increment_iterator(schema, 200)); @@ -116,7 +116,7 @@ TEST(GenericIteratorsTest, Union) { TEST(GenericIteratorsTest, Merge) { auto schema = create_schema(); - std::vector inputs; + std::list inputs; inputs.push_back(new_auto_increment_iterator(schema, 100)); inputs.push_back(new_auto_increment_iterator(schema, 200)); From 6e39aec4f9b5cda3fe216c7099fc4b33aa513bd9 Mon Sep 17 00:00:00 2001 From: Yingchun Lai <405403881@qq.com> Date: Tue, 8 Jun 2021 15:01:34 +0800 Subject: [PATCH 2/2] update by cr comments --- be/src/olap/rowset/alpha_rowset_reader.h | 7 +++---- be/src/olap/rowset/unique_rowset_id_generator.h | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h index a8e22155645032..1667449dcfde83 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.h +++ b/be/src/olap/rowset/alpha_rowset_reader.h @@ -121,14 +121,13 @@ class AlphaRowsetReader : public RowsetReader { // Current AlphaMergeContext to read data, just valid in '_union_block' mode. AlphaMergeContext* _cur_ctx = nullptr; + // A priority queue for merging rowsets, just valid in '_merge_block' mode. + std::priority_queue, AlphaMergeContextComparator> + _merge_heap; RowsetReaderContext* _current_read_context; OlapReaderStatistics _owned_stats; OlapReaderStatistics* _stats = &_owned_stats; - - // a priority queue for merging rowsets - std::priority_queue, AlphaMergeContextComparator> - _merge_heap; }; } // namespace doris diff --git a/be/src/olap/rowset/unique_rowset_id_generator.h b/be/src/olap/rowset/unique_rowset_id_generator.h index 593a9155e52a14..22909bc1133795 100644 --- a/be/src/olap/rowset/unique_rowset_id_generator.h +++ b/be/src/olap/rowset/unique_rowset_id_generator.h @@ -25,7 +25,6 @@ namespace doris { -// TODO(yingchun): why use two classes? class UniqueRowsetIdGenerator : public RowsetIdGenerator { public: UniqueRowsetIdGenerator(const UniqueId& backend_uid);