diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp index cf9fdbedf4b610..5fce312cc1d0f6 100644 --- a/be/src/olap/collect_iterator.cpp +++ b/be/src/olap/collect_iterator.cpp @@ -44,55 +44,62 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { return OLAP_ERR_DATA_EOF; } - LevelIterator* child_ptr = child.release(); - _children.push_back(child_ptr); + _children.push_back(child.release()); return OLAP_SUCCESS; } // Build a merge heap. If _merge is true, a rowset with the max rownum // status will be used as the base rowset, and the other rowsets will be merged first and // then merged with the base rowset. -void CollectIterator::build_heap() { - DCHECK(_reader->_rs_readers.size() == _children.size()); +void CollectIterator::build_heap(const std::vector& rs_readers) { + DCHECK(rs_readers.size() == _children.size()); _reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS; if (_children.empty()) { _inner_iter.reset(nullptr); return; } else if (_merge) { - DCHECK(!_reader->_rs_readers.empty()); - // build merge heap with two children, a base rowset as level0iterator and + DCHECK(!rs_readers.empty()); + // build merge heap with two children, a base rowset as level0iterator and // other cumulative rowsets as a level1iterator if (_children.size() > 1) { - // find base rowset(max rownum), - RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0]; + // find 'base rowset', 'base rowset' is the rowset which contains the max row number + int64_t max_row_num = 0; int base_reader_idx = 0; - for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) { - if (_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() > - base_reader->rowset()->rowset_meta()->num_rows()) { - base_reader = _reader->_rs_readers[i]; + for (size_t i = 0; i < rs_readers.size(); ++i) { + int64_t cur_row_num = rs_readers[i]->rowset()->rowset_meta()->num_rows(); + if (cur_row_num > max_row_num) { + max_row_num = cur_row_num; base_reader_idx = i; } } - std::vector cumu_children; - for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) { + auto base_reader_child = _children.begin(); + std::advance(base_reader_child, base_reader_idx); + + std::list cumu_children; + int i = 0; + for (const auto& child : _children) { if (i != base_reader_idx) { - cumu_children.push_back(_children[i]); + cumu_children.push_back(child); } + ++i; } Level1Iterator* cumu_iter = new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse); cumu_iter->init(); - std::vector children; - children.push_back(_children[base_reader_idx]); + std::list children; + children.push_back(*base_reader_child); children.push_back(cumu_iter); _inner_iter.reset(new Level1Iterator(children, _merge, _reverse)); } else { + // _children.size() == 1 _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); } } else { _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); } _inner_iter->init(); + // Clear _children earlier to release any related references + _children.clear(); } bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a, @@ -114,16 +121,6 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a return a->version() > b->version(); } -void CollectIterator::clear() { - for (auto child : _children) { - if (child != nullptr) { - delete child; - child = nullptr; - } - } - _children.clear(); -} - const RowCursor* CollectIterator::current_row(bool* delete_flag) const { if (LIKELY(_inner_iter)) { return _inner_iter->current_row(delete_flag); @@ -230,7 +227,7 @@ OLAPStatus CollectIterator::Level0Iterator::next(const RowCursor** row, bool* de } CollectIterator::Level1Iterator::Level1Iterator( - const std::vector& children, bool merge, bool reverse) + const std::list& children, bool merge, bool reverse) : _children(children), _merge(merge), _reverse(reverse) {} CollectIterator::LevelIterator::~LevelIterator() {} @@ -250,7 +247,7 @@ CollectIterator::Level1Iterator::~Level1Iterator() { // OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. // Others when error happens OLAPStatus CollectIterator::Level1Iterator::next(const RowCursor** row, bool* delete_flag) { - if (UNLIKELY(_children.size() == 0)) { + if (UNLIKELY(_cur_child == nullptr)) { return OLAP_ERR_DATA_EOF; } if (_merge) { @@ -284,23 +281,25 @@ int64_t CollectIterator::Level1Iterator::version() const { } OLAPStatus CollectIterator::Level1Iterator::init() { - if (_children.size() == 0) { + if (_children.empty()) { return OLAP_SUCCESS; } + // Only when there are multiple children that need to be merged if (_merge && _children.size() > 1) { _heap.reset(new MergeHeap(LevelIteratorComparator(_reverse))); for (auto child : _children) { - if (child == nullptr || child->current_row() == nullptr) { - continue; - } + DCHECK(child != nullptr); + DCHECK(child->current_row() != nullptr); _heap->push(child); - _cur_child = _heap->top(); } + _cur_child = _heap->top(); + // Clear _children earlier to release any related references + _children.clear(); } else { _merge = false; _heap.reset(nullptr); - _cur_child = _children[_child_idx]; + _cur_child = *(_children.begin()); } return OLAP_SUCCESS; } @@ -313,6 +312,8 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor** _heap->push(_cur_child); _cur_child = _heap->top(); } else if (res == OLAP_ERR_DATA_EOF) { + // current child has been read, to read next + delete _cur_child; if (!_heap->empty()) { _cur_child = _heap->top(); } else { @@ -320,6 +321,7 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor** return OLAP_ERR_DATA_EOF; } } else { + _cur_child = nullptr; LOG(WARNING) << "failed to get next from child, res=" << res; return res; } @@ -333,10 +335,11 @@ inline OLAPStatus CollectIterator::Level1Iterator::_normal_next(const RowCursor* if (LIKELY(res == OLAP_SUCCESS)) { return OLAP_SUCCESS; } else if (res == OLAP_ERR_DATA_EOF) { - // this child has been read, to read next - _child_idx++; - if (_child_idx < _children.size()) { - _cur_child = _children[_child_idx]; + // current child has been read, to read next + delete _cur_child; + _children.pop_front(); + if (!_children.empty()) { + _cur_child = *(_children.begin()); *row = _cur_child->current_row(delete_flag); return OLAP_SUCCESS; } else { @@ -344,6 +347,7 @@ inline OLAPStatus CollectIterator::Level1Iterator::_normal_next(const RowCursor* return OLAP_ERR_DATA_EOF; } } else { + _cur_child = nullptr; LOG(WARNING) << "failed to get next from child, res=" << res; return res; } diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h index 1ed0831c5c6d82..92f3748eaafc8d 100644 --- a/be/src/olap/collect_iterator.h +++ b/be/src/olap/collect_iterator.h @@ -35,7 +35,7 @@ class CollectIterator { OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); - void build_heap(); + void build_heap(const std::vector& rs_readers); // Get top row of the heap, nullptr if reach end. const RowCursor* current_row(bool* delete_flag) const; @@ -47,9 +47,6 @@ class CollectIterator { // Others when error happens OLAPStatus next(const RowCursor** row, bool* delete_flag); - // Clear the MergeSet element and reset state. - void clear(); - private: // This interface is the actual implementation of the new version of iterator. // It currently contains two implementations, one is Level0Iterator, @@ -70,6 +67,7 @@ class CollectIterator { virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0; virtual ~LevelIterator() = 0; }; + // Compare row cursors between multiple merge elements, // if row cursors equal, compare data version. class LevelIteratorComparator { @@ -79,7 +77,6 @@ class CollectIterator { private: bool _reverse; - OlapReaderStatistics* _stats; }; typedef std::priority_queue, @@ -90,15 +87,15 @@ class CollectIterator { public: Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader); - OLAPStatus init(); + OLAPStatus init() override; - const RowCursor* current_row(bool* delete_flag) const; + const RowCursor* current_row(bool* delete_flag) const override; - const RowCursor* current_row() const; + const RowCursor* current_row() const override; - int64_t version() const; + int64_t version() const override; - OLAPStatus next(const RowCursor** row, bool* delete_flag); + OLAPStatus next(const RowCursor** row, bool* delete_flag) override; ~Level0Iterator(); @@ -109,27 +106,27 @@ class CollectIterator { OLAPStatus _refresh_current_row_v2(); RowsetReaderSharedPtr _rs_reader; - const RowCursor* _current_row = nullptr; + const RowCursor* _current_row = nullptr; // It points to the returned row bool _is_delete = false; Reader* _reader = nullptr; - // point to rows inside `_row_block` - RowCursor _row_cursor; + RowCursor _row_cursor; // It points to rows inside `_row_block`, maybe not returned RowBlock* _row_block = nullptr; }; + // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed) class Level1Iterator : public LevelIterator { public: - Level1Iterator(const std::vector& children, bool merge, bool reverse); + Level1Iterator(const std::list& children, bool merge, bool reverse); - OLAPStatus init(); + OLAPStatus init() override; - const RowCursor* current_row(bool* delete_flag) const; + const RowCursor* current_row(bool* delete_flag) const override; - const RowCursor* current_row() const; + const RowCursor* current_row() const override; - int64_t version() const; + int64_t version() const override; - OLAPStatus next(const RowCursor** row, bool* delete_flag); + OLAPStatus next(const RowCursor** row, bool* delete_flag) override; ~Level1Iterator(); @@ -137,8 +134,9 @@ class CollectIterator { inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag); inline OLAPStatus _normal_next(const RowCursor** row, bool* delete_flag); - // each Level0Iterator corresponds to a rowset reader - const std::vector _children; + // Each LevelIterator corresponds to a rowset reader, + // it will be cleared after '_heap' has been initilized when '_merge == true'. + std::list _children; // point to the Level0Iterator containing the next output row. // null when CollectIterator hasn't been initialized or reaches EOF. LevelIterator* _cur_child = nullptr; @@ -158,8 +156,9 @@ class CollectIterator { std::unique_ptr _inner_iter; - // each LevelIterator corresponds to a rowset reader - std::vector _children; + // Each LevelIterator corresponds to a rowset reader, + // it will be cleared after '_inner_iter' has been initilized. + std::list _children; bool _merge = true; bool _reverse = false; diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c567e4a2865382..383ae9ced1df02 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -83,8 +83,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { _tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash); LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output_version=" << _output_version.first << "-" << _output_version.second - << ", permits: " << permits; + << ", output_version=" << _output_version << ", permits: " << permits; RETURN_NOT_OK(construct_output_rowset_writer()); RETURN_NOT_OK(construct_input_rowset_readers()); @@ -98,8 +97,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res << ", tablet=" << _tablet->full_name() - << ", output_version=" << _output_version.first << "-" - << _output_version.second; + << ", output_version=" << _output_version; return res; } TRACE("merge rowsets finished"); @@ -109,8 +107,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { _output_rowset = _output_rs_writer->build(); if (_output_rowset == nullptr) { LOG(WARNING) << "rowset writer build failed. writer version:" - << ", output_version=" << _output_version.first << "-" - << _output_version.second; + << ", output_version=" << _output_version; return OLAP_ERR_MALLOC_ERROR; } TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size()); @@ -128,6 +125,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { // 5. update last success compaction time int64_t now = UnixMillis(); + // TODO(yingchun): do the judge in Tablet class if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { _tablet->set_last_cumu_compaction_success_time(now); } else { @@ -141,7 +139,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { } LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output_version=" << _output_version.first << "-" << _output_version.second + << ", output_version=" << _output_version << ", current_max_version=" << current_max_version << ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num << ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy=" diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp index 52bfb6cad8232c..66e97d6ddd3f0a 100644 --- a/be/src/olap/generic_iterators.cpp +++ b/be/src/olap/generic_iterators.cpp @@ -113,8 +113,13 @@ Status AutoIncrementIterator::next_batch(RowBlockV2* block) { // } class MergeIteratorContext { public: - // This class don't take iter's ownership, client should delete it - MergeIteratorContext(RowwiseIterator* iter, std::shared_ptr parent) : _iter(iter), _block(iter->schema(), 1024, std::move(parent)) {} + MergeIteratorContext(RowwiseIterator* iter, std::shared_ptr parent) + : _iter(iter), _block(iter->schema(), 1024, std::move(parent)) {} + + ~MergeIteratorContext() { + delete _iter; + _iter = nullptr; + } // Initialize this context and will prepare data for current_row() Status init(const StorageReadOptions& opts); @@ -199,16 +204,15 @@ Status MergeIteratorContext::_load_next_block() { class MergeIterator : public RowwiseIterator { public: // MergeIterator takes the ownership of input iterators - MergeIterator(std::vector iters, std::shared_ptr parent) : _origin_iters(std::move(iters)) { + MergeIterator(std::list iters, std::shared_ptr parent) : _origin_iters(std::move(iters)) { // use for count the mem use of Block use in Merge _mem_tracker = MemTracker::CreateTracker(-1, "MergeIterator", parent, false); } ~MergeIterator() override { - for (auto iter : _origin_iters) { - delete iter; - } - for (auto ctx : _merge_ctxs) { + while (!_merge_heap->empty()) { + auto ctx = _merge_heap->top(); + _merge_heap->pop(); delete ctx; } } @@ -218,8 +222,8 @@ class MergeIterator : public RowwiseIterator { const Schema& schema() const override { return *_schema; } private: - std::vector _origin_iters; - std::vector _merge_ctxs; + // It will be released after '_merge_heap' has been built. + std::list _origin_iters; std::unique_ptr _schema; @@ -247,18 +251,18 @@ Status MergeIterator::init(const StorageReadOptions& opts) { if (_origin_iters.empty()) { return Status::OK(); } - _schema.reset(new Schema(_origin_iters[0]->schema())); - _merge_heap.reset(new MergeHeap); + _schema.reset(new Schema((*(_origin_iters.begin()))->schema())); + _merge_heap.reset(new MergeHeap); for (auto iter : _origin_iters) { std::unique_ptr ctx(new MergeIteratorContext(iter, _mem_tracker)); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; } - _merge_heap->push(ctx.get()); - _merge_ctxs.push_back(ctx.release()); + _merge_heap->push(ctx.release()); } + _origin_iters.clear(); return Status::OK(); } @@ -279,6 +283,9 @@ Status MergeIterator::next_batch(RowBlockV2* block) { RETURN_IF_ERROR(ctx->advance()); if (ctx->valid()) { _merge_heap->push(ctx); + } else { + // Release ctx earlier to reduce resource consumed + delete ctx; } } block->set_num_rows(row_idx); @@ -296,7 +303,8 @@ class UnionIterator : public RowwiseIterator { // Iterators' ownership it transfered to this class. // This class will delete all iterators when destructs // Client should not use iterators any more. - UnionIterator(std::vector iters, std::shared_ptr parent) : _origin_iters(std::move(iters)) { + UnionIterator(std::list iters, std::shared_ptr parent) + : _origin_iters(std::move(iters)) { _mem_tracker = MemTracker::CreateTracker(-1, "UnionIterator", parent, false); } @@ -308,46 +316,54 @@ class UnionIterator : public RowwiseIterator { Status init(const StorageReadOptions& opts) override; Status next_batch(RowBlockV2* block) override; - const Schema& schema() const override { return _origin_iters[0]->schema(); } + const Schema& schema() const override { return *_schema; } private: - std::vector _origin_iters; - size_t _iter_idx = 0; + std::unique_ptr _schema; + RowwiseIterator* _cur_iter = nullptr; + std::list _origin_iters; }; Status UnionIterator::init(const StorageReadOptions& opts) { + if (_origin_iters.empty()) { + return Status::OK(); + } + for (auto iter : _origin_iters) { RETURN_IF_ERROR(iter->init(opts)); } + _schema.reset(new Schema((*(_origin_iters.begin()))->schema())); + _cur_iter = *(_origin_iters.begin()); return Status::OK(); } Status UnionIterator::next_batch(RowBlockV2* block) { - if (_iter_idx >= _origin_iters.size()) { - return Status::EndOfFile("End of UnionIterator"); - } - do { - auto iter = _origin_iters[_iter_idx]; - auto st = iter->next_batch(block); + while (_cur_iter != nullptr) { + auto st = _cur_iter->next_batch(block); if (st.is_end_of_file()) { - _iter_idx++; + delete _cur_iter; + _cur_iter = nullptr; + _origin_iters.pop_front(); + if (!_origin_iters.empty()) { + _cur_iter = *(_origin_iters.begin()); + } } else { return st; } - } while (_iter_idx < _origin_iters.size()); + } return Status::EndOfFile("End of UnionIterator"); } -RowwiseIterator* new_merge_iterator(std::vector inputs, std::shared_ptr parent) { +RowwiseIterator* new_merge_iterator(std::list inputs, std::shared_ptr parent) { if (inputs.size() == 1) { - return inputs[0]; + return *(inputs.begin()); } return new MergeIterator(std::move(inputs), parent); } -RowwiseIterator* new_union_iterator(std::vector inputs, std::shared_ptr parent) { +RowwiseIterator* new_union_iterator(std::list inputs, std::shared_ptr parent) { if (inputs.size() == 1) { - return inputs[0]; + return *(inputs.begin()); } return new UnionIterator(std::move(inputs), parent); } diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h index 9f95d03d1f4a91..139294217bfad2 100644 --- a/be/src/olap/generic_iterators.h +++ b/be/src/olap/generic_iterators.h @@ -25,14 +25,14 @@ namespace doris { // // Inputs iterators' ownership is taken by created merge iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_merge_iterator(std::vector inputs, std::shared_ptr parent); +RowwiseIterator* new_merge_iterator(std::list inputs, std::shared_ptr parent); // Create a union iterator for input iterators. Union iterator will read // input iterators one by one. // // Inputs iterators' ownership is taken by created union iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_union_iterator(std::vector inputs, std::shared_ptr parent); +RowwiseIterator* new_union_iterator(std::list inputs, std::shared_ptr parent); // Create an auto increment iterator which returns num_rows data in format of schema. // This class aims to be used in unit test. diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 932dcf60853cbe..69301ea06ee0d3 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -48,6 +48,7 @@ OLAPStatus Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, "failed to init row cursor when merging rowsets of tablet " + tablet->full_name()); row_cursor.allocate_memory_for_string_type(tablet->tablet_schema()); + // TODO(yingchun): monitor std::shared_ptr tracker(new MemTracker(-1)); std::unique_ptr mem_pool(new MemPool(tracker.get())); diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 39081eee719f73..ce88f23e4ca19c 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -102,6 +102,7 @@ Reader::~Reader() { } OLAPStatus Reader::init(const ReaderParams& read_params) { + // TODO(yingchun): monitor _tracker.reset(new MemTracker(-1, read_params.tablet->full_name())); _predicate_mem_pool.reset(new MemPool(_tracker.get())); @@ -115,7 +116,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return res; } - res = _capture_rs_readers(read_params); + std::vector rs_readers; + res = _capture_rs_readers(read_params, &rs_readers); if (res != OLAP_SUCCESS) { LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res << ", tablet_id:" << read_params.tablet->tablet_id() @@ -125,11 +127,36 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return res; } - // When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation + if (_optimize_for_single_rowset(rs_readers)) { + _next_row_func = _tablet->keys_type() == AGG_KEYS ? &Reader::_direct_agg_key_next_row + : &Reader::_direct_next_row; + return OLAP_SUCCESS; + } + + switch (_tablet->keys_type()) { + case KeysType::DUP_KEYS: + _next_row_func = &Reader::_direct_next_row; + break; + case KeysType::UNIQUE_KEYS: + _next_row_func = &Reader::_unique_key_next_row; + break; + case KeysType::AGG_KEYS: + _next_row_func = &Reader::_agg_key_next_row; + break; + default: + DCHECK(false) << "No next row function for type:" << _tablet->keys_type(); + break; + } + + return OLAP_SUCCESS; +} + +// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation +bool Reader::_optimize_for_single_rowset(const std::vector& rs_readers) { bool has_delete_rowset = false; bool has_overlapping = false; int nonoverlapping_count = 0; - for (auto rs_reader : _rs_readers) { + for (const auto& rs_reader : rs_readers) { if (rs_reader->rowset()->rowset_meta()->delete_flag()) { has_delete_rowset = true; break; @@ -144,27 +171,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { } } } - if (!has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset) { - _next_row_func = _tablet->keys_type() == AGG_KEYS ? &Reader::_direct_agg_key_next_row - : &Reader::_direct_next_row; - } else { - switch (_tablet->keys_type()) { - case KeysType::DUP_KEYS: - _next_row_func = &Reader::_direct_next_row; - break; - case KeysType::UNIQUE_KEYS: - _next_row_func = &Reader::_unique_key_next_row; - break; - case KeysType::AGG_KEYS: - _next_row_func = &Reader::_agg_key_next_row; - break; - default: - break; - } - } - DCHECK(_next_row_func != nullptr) << "No next row function for type:" << _tablet->keys_type(); - return OLAP_SUCCESS; + return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset; } OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, @@ -180,6 +188,7 @@ OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, Ob } return OLAP_SUCCESS; } + OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) { if (UNLIKELY(_next_key == nullptr)) { @@ -188,7 +197,7 @@ OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_ } init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool); auto res = _collect_iter->next(&_next_key, &_next_delete_flag); - if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { + if (UNLIKELY(res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF)) { return res; } if (_need_agg_finalize) { @@ -301,7 +310,8 @@ void Reader::close() { } } -OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { +OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params, + std::vector* valid_rs_readers) { const std::vector* rs_readers = &read_params.rs_readers; if (rs_readers->empty()) { LOG(WARNING) << "fail to acquire data sources. tablet=" << _tablet->full_name(); @@ -399,10 +409,10 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { return res; } if (res == OLAP_SUCCESS) { - _rs_readers.push_back(rs_reader); + valid_rs_readers->push_back(rs_reader); } } - _collect_iter->build_heap(); + _collect_iter->build_heap(*valid_rs_readers); _next_key = _collect_iter->current_row(&_next_delete_flag); return OLAP_SUCCESS; } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 788e48cb76ddd3..766f8ec4f054d0 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -124,7 +124,9 @@ class Reader { OLAPStatus _init_params(const ReaderParams& read_params); - OLAPStatus _capture_rs_readers(const ReaderParams& read_params); + OLAPStatus _capture_rs_readers(const ReaderParams& read_params, std::vector* valid_rs_readers); + + bool _optimize_for_single_rowset(const std::vector& rs_readers); OLAPStatus _init_keys_param(const ReaderParams& read_params); @@ -174,7 +176,6 @@ class Reader { std::vector _seek_columns; TabletSharedPtr _tablet; - std::vector _rs_readers; RowsetReaderContext _reader_context; KeysParam _keys_param; std::vector _is_lower_keys_included; diff --git a/be/src/olap/row_block.h b/be/src/olap/row_block.h index b44a55014e309f..75924fa63228de 100644 --- a/be/src/olap/row_block.h +++ b/be/src/olap/row_block.h @@ -72,6 +72,8 @@ class RowBlock { cursor->attach(_mem_buf + row_index * _mem_row_bytes); } + // TODO(yingchun): why not use _pos directly? + template inline void set_row(uint32_t row_index, const RowType& row) const { memcpy(_mem_buf + row_index * _mem_row_bytes, row.row_ptr(), _mem_row_bytes); diff --git a/be/src/olap/row_cursor.cpp b/be/src/olap/row_cursor.cpp index ca2f690f5063fd..e76d9bea40cc1d 100644 --- a/be/src/olap/row_cursor.cpp +++ b/be/src/olap/row_cursor.cpp @@ -165,6 +165,7 @@ OLAPStatus RowCursor::init_scan_key(const TabletSchema& schema, return OLAP_SUCCESS; } +// TODO(yingchun): parameter 'const TabletSchema& schema' is not used OLAPStatus RowCursor::allocate_memory_for_string_type(const TabletSchema& schema) { // allocate memory for string type(char, varchar, hll) // The memory allocated in this function is used in aggregate and copy function diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp b/be/src/olap/rowset/alpha_rowset_reader.cpp index 1abc6dafef5ec1..9a72daee880137 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.cpp +++ b/be/src/olap/rowset/alpha_rowset_reader.cpp @@ -37,6 +37,15 @@ AlphaRowsetReader::AlphaRowsetReader(int num_rows_per_row_block, AlphaRowsetShar AlphaRowsetReader::~AlphaRowsetReader() { delete _dst_cursor; _rowset->release(); + while (!_merge_heap.empty()) { + auto ctx = _merge_heap.top(); + _merge_heap.pop(); + delete ctx; + } + for (auto ctx : _sequential_ctxs) { + delete ctx; + } + _sequential_ctxs.clear(); } OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { @@ -50,15 +59,14 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { } _is_segments_overlapping = _alpha_rowset_meta->is_segments_overlapping(); - _ordinal = 0; RETURN_NOT_OK(_init_merge_ctxs(read_context)); // needs to sort merge only when // 1) we are told to return sorted result (need_ordered_result) - // 2) we have several segment groups (_is_segments_overlapping && _merge_ctxs.size() > 1) + // 2) we have several segment groups (_is_segments_overlapping && _sequential_ctxs.size() > 1) if (_current_read_context->need_ordered_result && _is_segments_overlapping && - _merge_ctxs.size() > 1) { + _sequential_ctxs.size() > 1) { _next_block = &AlphaRowsetReader::_merge_block; _read_block.reset(new (std::nothrow) RowBlock(_current_read_context->tablet_schema, _parent_tracker)); @@ -79,22 +87,23 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) { // Upon rollup/alter table, seek_columns is nullptr. // Under this circumstance, init RowCursor with all columns. _dst_cursor->init(*(_current_read_context->tablet_schema)); - for (size_t i = 0; i < _merge_ctxs.size(); ++i) { - _merge_ctxs[i].row_cursor.reset(new (std::nothrow) RowCursor()); - _merge_ctxs[i].row_cursor->init(*(_current_read_context->tablet_schema)); + for (auto ctx : _sequential_ctxs) { + ctx->row_cursor.reset(new (std::nothrow) RowCursor()); + ctx->row_cursor->init(*(_current_read_context->tablet_schema)); } } else { _dst_cursor->init(*(_current_read_context->tablet_schema), *(_current_read_context->seek_columns)); - for (size_t i = 0; i < _merge_ctxs.size(); ++i) { - _merge_ctxs[i].row_cursor.reset(new (std::nothrow) RowCursor()); - _merge_ctxs[i].row_cursor->init(*(_current_read_context->tablet_schema), - *(_current_read_context->seek_columns)); + for (auto ctx : _sequential_ctxs) { + ctx->row_cursor.reset(new (std::nothrow) RowCursor()); + ctx->row_cursor->init(*(_current_read_context->tablet_schema), + *(_current_read_context->seek_columns)); } } RETURN_NOT_OK(_init_merge_heap()); } else { _next_block = &AlphaRowsetReader::_union_block; + _cur_ctx = *(_sequential_ctxs.begin()); } return OLAP_SUCCESS; } @@ -120,20 +129,24 @@ int64_t AlphaRowsetReader::filtered_rows() { } OLAPStatus AlphaRowsetReader::_union_block(RowBlock** block) { - while (_ordinal < _merge_ctxs.size()) { + while (_cur_ctx != nullptr) { // union block only use one block to store - OLAPStatus status = _pull_next_block(&(_merge_ctxs[_ordinal])); + OLAPStatus status = _pull_next_block(_cur_ctx); if (status == OLAP_ERR_DATA_EOF) { - _ordinal++; - continue; + delete _cur_ctx; + _cur_ctx = nullptr; + _sequential_ctxs.pop_front(); + if (!_sequential_ctxs.empty()) { + _cur_ctx = *(_sequential_ctxs.begin()); + } } else if (status != OLAP_SUCCESS) { return status; } else { - (*block) = _merge_ctxs[_ordinal].row_block; + (*block) = _cur_ctx->row_block; return OLAP_SUCCESS; } } - if (_ordinal == _merge_ctxs.size()) { + if (_sequential_ctxs.empty()) { *block = nullptr; return OLAP_ERR_DATA_EOF; } @@ -148,6 +161,7 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { _read_block->clear(); size_t num_rows_in_block = 0; while (_read_block->pos() < _num_rows_per_row_block) { + // 1. Read one row from heap RowCursor* row_cursor = nullptr; status = _pull_next_row_for_merge_rowset_v2(&row_cursor); if (status == OLAP_ERR_DATA_EOF && _read_block->pos() > 0) { @@ -159,11 +173,13 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { VLOG_TRACE << "get merged row: " << row_cursor->to_string(); + // 2. Copy the row to buffer block _read_block->get_row(_read_block->pos(), _dst_cursor); copy_row(_dst_cursor, *row_cursor, _read_block->mem_pool()); _read_block->pos_inc(); num_rows_in_block++; + // 3. Adjust heap // MergeHeap should advance one step after row been read. // This function must be called after copy_row // Otherwise, the row has read will be modified instantly before handled. @@ -174,6 +190,7 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { // The returned row will be (2, 2) instead of (1, 1) AlphaMergeContext* merge_ctx = _merge_heap.top(); _merge_heap.pop(); + // merge_ctx will not be pushed back into heap if it is EOF RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(merge_ctx)); } _read_block->set_pos(0); @@ -184,17 +201,19 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) { } OLAPStatus AlphaRowsetReader::_init_merge_heap() { - if (_merge_heap.empty() && !_merge_ctxs.empty()) { - for (auto& merge_ctx : _merge_ctxs) { - RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(&merge_ctx)); - } + DCHECK(_merge_heap.empty()); + DCHECK(!_sequential_ctxs.empty()); + for (auto merge_ctx : _sequential_ctxs) { + RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(merge_ctx)); } + _sequential_ctxs.clear(); return OLAP_SUCCESS; } OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx) { - if (merge_ctx->is_eof) { - // nothing in this merge ctx, just return + if (OLAP_UNLIKELY(merge_ctx->is_eof)) { + // nothing in this merge ctx, release and return + delete merge_ctx; return OLAP_SUCCESS; } @@ -202,9 +221,11 @@ OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeC if (merge_ctx->row_block == nullptr || !merge_ctx->row_block->has_remaining()) { OLAPStatus status = _pull_next_block(merge_ctx); if (status == OLAP_ERR_DATA_EOF) { - merge_ctx->is_eof = true; + // nothing in this merge ctx, release and return + delete merge_ctx; return OLAP_SUCCESS; } else if (status != OLAP_SUCCESS) { + delete merge_ctx; LOG(WARNING) << "read next row of singleton rowset failed:" << status; return status; } @@ -221,7 +242,7 @@ OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeC OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset_v2(RowCursor** row) { // if _merge_heap is not empty, return the row at top, and insert a new row // from corresponding merge_ctx - if (!_merge_heap.empty()) { + if (OLAP_LIKELY(!_merge_heap.empty())) { AlphaMergeContext* merge_ctx = _merge_heap.top(); *row = merge_ctx->row_cursor.get(); // Must not rebuild merge_heap in this place. @@ -234,39 +255,6 @@ OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset_v2(RowCursor** row } } -OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset(RowCursor** row) { - RowCursor* min_row = nullptr; - int min_index = -1; - - size_t ordinal = 0; - while (ordinal < _merge_ctxs.size()) { - AlphaMergeContext* merge_ctx = &(_merge_ctxs[ordinal]); - if (merge_ctx->row_block == nullptr || !merge_ctx->row_block->has_remaining()) { - OLAPStatus status = _pull_next_block(merge_ctx); - if (status == OLAP_ERR_DATA_EOF) { - _merge_ctxs.erase(_merge_ctxs.begin() + ordinal); - continue; - } else if (status != OLAP_SUCCESS) { - LOG(WARNING) << "read next row of singleton rowset failed:" << status; - return status; - } - } - RowCursor* current_row = merge_ctx->row_cursor.get(); - merge_ctx->row_block->get_row(merge_ctx->row_block->pos(), current_row); - if (min_row == nullptr || compare_row(*min_row, *current_row) > 0) { - min_row = current_row; - min_index = ordinal; - } - ordinal++; - } - if (min_row == nullptr || min_index == -1) { - return OLAP_ERR_DATA_EOF; - } - *row = min_row; - _merge_ctxs[min_index].row_block->pos_inc(); - return OLAP_SUCCESS; -} - OLAPStatus AlphaRowsetReader::_pull_next_block(AlphaMergeContext* merge_ctx) { OLAPStatus status = OLAP_SUCCESS; if (OLAP_UNLIKELY(merge_ctx->first_read_symbol)) { @@ -394,14 +382,14 @@ OLAPStatus AlphaRowsetReader::_init_merge_ctxs(RowsetReaderContext* read_context << new_column_data->version(); new_column_data->set_delete_status(DEL_NOT_SATISFIED); } - AlphaMergeContext merge_ctx; - merge_ctx.column_data = std::move(new_column_data); - _merge_ctxs.emplace_back(std::move(merge_ctx)); + auto merge_ctx = new AlphaMergeContext(); + merge_ctx->column_data = std::move(new_column_data); + _sequential_ctxs.emplace_back(merge_ctx); } - if (!_is_segments_overlapping && _merge_ctxs.size() > 1) { + if (!_is_segments_overlapping && _sequential_ctxs.size() > 1) { LOG(WARNING) << "invalid column_data for cumulative rowset. column_data size:" - << _merge_ctxs.size(); + << _sequential_ctxs.size(); return OLAP_ERR_READER_READING_ERROR; } return OLAP_SUCCESS; diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h index 53f1aef969e324..1667449dcfde83 100644 --- a/be/src/olap/rowset/alpha_rowset_reader.h +++ b/be/src/olap/rowset/alpha_rowset_reader.h @@ -80,7 +80,6 @@ class AlphaRowsetReader : public RowsetReader { OLAPStatus _union_block(RowBlock** block); OLAPStatus _merge_block(RowBlock** block); - OLAPStatus _pull_next_row_for_merge_rowset(RowCursor** row); OLAPStatus _pull_next_block(AlphaMergeContext* merge_ctx); // Doris will split query predicates to several scan keys @@ -89,8 +88,6 @@ class AlphaRowsetReader : public RowsetReader { OLAPStatus _pull_first_block(AlphaMergeContext* merge_ctx); // merge by priority queue(_merge_heap) - // this method has same function with _pull_next_row_for_merge_rowset, but using heap merge. - // and this should replace the _pull_next_row_for_merge_rowset later. OLAPStatus _pull_next_row_for_merge_rowset_v2(RowCursor** row); // init the merge heap, this should be call before calling _pull_next_row_for_merge_rowset_v2(); OLAPStatus _init_merge_heap(); @@ -108,7 +105,10 @@ class AlphaRowsetReader : public RowsetReader { AlphaRowsetMeta* _alpha_rowset_meta; const std::vector>& _segment_groups; - std::vector _merge_ctxs; + // In '_union_block' mode, it has items to traverse. + // In '_merge_block' mode, it will be cleared after '_merge_heap' has been built. + std::list _sequential_ctxs; + std::unique_ptr _read_block; OLAPStatus (AlphaRowsetReader::*_next_block)(RowBlock** block) = nullptr; RowCursor* _dst_cursor = nullptr; @@ -119,16 +119,15 @@ class AlphaRowsetReader : public RowsetReader { // into consideration deliberately. bool _is_segments_overlapping; - // ordinal of ColumnData upon reading - size_t _ordinal; + // Current AlphaMergeContext to read data, just valid in '_union_block' mode. + AlphaMergeContext* _cur_ctx = nullptr; + // A priority queue for merging rowsets, just valid in '_merge_block' mode. + std::priority_queue, AlphaMergeContextComparator> + _merge_heap; RowsetReaderContext* _current_read_context; OlapReaderStatistics _owned_stats; OlapReaderStatistics* _stats = &_owned_stats; - - // a priority queue for merging rowsets - std::priority_queue, AlphaMergeContextComparator> - _merge_heap; }; } // namespace doris diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index cbfecfa2e89e05..d0e74966f06d7a 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -98,7 +98,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { } seg_iterators.push_back(std::move(iter)); } - std::vector iterators; + std::list iterators; for (auto& owned_it : seg_iterators) { // transfer ownership of segment iterator to `_iterator` iterators.push_back(owned_it.release()); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 46dbf06c8da255..e8a1defe771b34 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -73,7 +73,6 @@ using std::list; using std::map; using std::nothrow; using std::pair; -using std::priority_queue; using std::set; using std::set_difference; using std::string; diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp index e4271b6fd149b1..072d589fddf8d6 100644 --- a/be/test/olap/generic_iterators_test.cpp +++ b/be/test/olap/generic_iterators_test.cpp @@ -77,7 +77,7 @@ TEST(GenericIteratorsTest, AutoIncrement) { TEST(GenericIteratorsTest, Union) { auto schema = create_schema(); - std::vector inputs; + std::list inputs; inputs.push_back(new_auto_increment_iterator(schema, 100)); inputs.push_back(new_auto_increment_iterator(schema, 200)); @@ -116,7 +116,7 @@ TEST(GenericIteratorsTest, Union) { TEST(GenericIteratorsTest, Merge) { auto schema = create_schema(); - std::vector inputs; + std::list inputs; inputs.push_back(new_auto_increment_iterator(schema, 100)); inputs.push_back(new_auto_increment_iterator(schema, 200));