Skip to content

Commit

Permalink
[Enhancement] Reduce memory consumption by release readers earier
Browse files Browse the repository at this point in the history
We created multiple rowset readers to read data of one tablet,
after one rowset reader has reached EOF, it can be released to
reduce resource (typicallly memory) comsumption.
As the same, we can release segment reader when it reach EOF.
  • Loading branch information
acelyc111 committed Jun 8, 2021
1 parent 8ea0ae4 commit 9f4fc32
Show file tree
Hide file tree
Showing 16 changed files with 221 additions and 201 deletions.
82 changes: 43 additions & 39 deletions be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,55 +44,62 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
return OLAP_ERR_DATA_EOF;
}

LevelIterator* child_ptr = child.release();
_children.push_back(child_ptr);
_children.push_back(child.release());
return OLAP_SUCCESS;
}

// Build a merge heap. If _merge is true, a rowset with the max rownum
// status will be used as the base rowset, and the other rowsets will be merged first and
// then merged with the base rowset.
void CollectIterator::build_heap() {
DCHECK(_reader->_rs_readers.size() == _children.size());
void CollectIterator::build_heap(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
DCHECK(rs_readers.size() == _children.size());
_reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
if (_children.empty()) {
_inner_iter.reset(nullptr);
return;
} else if (_merge) {
DCHECK(!_reader->_rs_readers.empty());
// build merge heap with two children a base rowset as level0iterator and
DCHECK(!rs_readers.empty());
// build merge heap with two children, a base rowset as level0iterator and
// other cumulative rowsets as a level1iterator
if (_children.size() > 1) {
// find base rowset(max rownum),
RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0];
// find 'base rowset', 'base rowset' is the rowset which contains the max row number
int64_t max_row_num = 0;
int base_reader_idx = 0;
for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) {
if (_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() >
base_reader->rowset()->rowset_meta()->num_rows()) {
base_reader = _reader->_rs_readers[i];
for (size_t i = 0; i < rs_readers.size(); ++i) {
int64_t cur_row_num = rs_readers[i]->rowset()->rowset_meta()->num_rows();
if (cur_row_num > max_row_num) {
max_row_num = cur_row_num;
base_reader_idx = i;
}
}
std::vector<LevelIterator*> cumu_children;
for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) {
auto base_reader_child = _children.begin();
std::advance(base_reader_child, base_reader_idx);

std::list<LevelIterator*> cumu_children;
int i = 0;
for (const auto& child : _children) {
if (i != base_reader_idx) {
cumu_children.push_back(_children[i]);
cumu_children.push_back(child);
}
++i;
}
Level1Iterator* cumu_iter =
new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse);
cumu_iter->init();
std::vector<LevelIterator*> children;
children.push_back(_children[base_reader_idx]);
std::list<LevelIterator*> children;
children.push_back(*base_reader_child);
children.push_back(cumu_iter);
_inner_iter.reset(new Level1Iterator(children, _merge, _reverse));
} else {
// _children.size() == 1
_inner_iter.reset(new Level1Iterator(_children, _merge, _reverse));
}
} else {
_inner_iter.reset(new Level1Iterator(_children, _merge, _reverse));
}
_inner_iter->init();
// Clear _children earlier to release any related references
_children.clear();
}

bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a,
Expand All @@ -114,16 +121,6 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a
return a->version() > b->version();
}

void CollectIterator::clear() {
for (auto child : _children) {
if (child != nullptr) {
delete child;
child = nullptr;
}
}
_children.clear();
}

const RowCursor* CollectIterator::current_row(bool* delete_flag) const {
if (LIKELY(_inner_iter)) {
return _inner_iter->current_row(delete_flag);
Expand Down Expand Up @@ -230,7 +227,7 @@ OLAPStatus CollectIterator::Level0Iterator::next(const RowCursor** row, bool* de
}

CollectIterator::Level1Iterator::Level1Iterator(
const std::vector<CollectIterator::LevelIterator*>& children, bool merge, bool reverse)
const std::list<CollectIterator::LevelIterator*>& children, bool merge, bool reverse)
: _children(children), _merge(merge), _reverse(reverse) {}

CollectIterator::LevelIterator::~LevelIterator() {}
Expand All @@ -250,7 +247,7 @@ CollectIterator::Level1Iterator::~Level1Iterator() {
// OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached.
// Others when error happens
OLAPStatus CollectIterator::Level1Iterator::next(const RowCursor** row, bool* delete_flag) {
if (UNLIKELY(_children.size() == 0)) {
if (UNLIKELY(_cur_child == nullptr)) {
return OLAP_ERR_DATA_EOF;
}
if (_merge) {
Expand Down Expand Up @@ -284,23 +281,25 @@ int64_t CollectIterator::Level1Iterator::version() const {
}

OLAPStatus CollectIterator::Level1Iterator::init() {
if (_children.size() == 0) {
if (_children.empty()) {
return OLAP_SUCCESS;
}

// Only when there are multiple children that need to be merged
if (_merge && _children.size() > 1) {
_heap.reset(new MergeHeap(LevelIteratorComparator(_reverse)));
for (auto child : _children) {
if (child == nullptr || child->current_row() == nullptr) {
continue;
}
DCHECK(child != nullptr);
DCHECK(child->current_row() != nullptr);
_heap->push(child);
_cur_child = _heap->top();
}
_cur_child = _heap->top();
// Clear _children earlier to release any related references
_children.clear();
} else {
_merge = false;
_heap.reset(nullptr);
_cur_child = _children[_child_idx];
_cur_child = *(_children.begin());
}
return OLAP_SUCCESS;
}
Expand All @@ -313,13 +312,16 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor**
_heap->push(_cur_child);
_cur_child = _heap->top();
} else if (res == OLAP_ERR_DATA_EOF) {
// current child has been read, to read next
delete _cur_child;
if (!_heap->empty()) {
_cur_child = _heap->top();
} else {
_cur_child = nullptr;
return OLAP_ERR_DATA_EOF;
}
} else {
_cur_child = nullptr;
LOG(WARNING) << "failed to get next from child, res=" << res;
return res;
}
Expand All @@ -333,17 +335,19 @@ inline OLAPStatus CollectIterator::Level1Iterator::_normal_next(const RowCursor*
if (LIKELY(res == OLAP_SUCCESS)) {
return OLAP_SUCCESS;
} else if (res == OLAP_ERR_DATA_EOF) {
// this child has been read, to read next
_child_idx++;
if (_child_idx < _children.size()) {
_cur_child = _children[_child_idx];
// current child has been read, to read next
delete _cur_child;
_children.pop_front();
if (!_children.empty()) {
_cur_child = *(_children.begin());
*row = _cur_child->current_row(delete_flag);
return OLAP_SUCCESS;
} else {
_cur_child = nullptr;
return OLAP_ERR_DATA_EOF;
}
} else {
_cur_child = nullptr;
LOG(WARNING) << "failed to get next from child, res=" << res;
return res;
}
Expand Down
45 changes: 22 additions & 23 deletions be/src/olap/collect_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CollectIterator {

OLAPStatus add_child(RowsetReaderSharedPtr rs_reader);

void build_heap();
void build_heap(const std::vector<RowsetReaderSharedPtr>& rs_readers);

// Get top row of the heap, nullptr if reach end.
const RowCursor* current_row(bool* delete_flag) const;
Expand All @@ -47,9 +47,6 @@ class CollectIterator {
// Others when error happens
OLAPStatus next(const RowCursor** row, bool* delete_flag);

// Clear the MergeSet element and reset state.
void clear();

private:
// This interface is the actual implementation of the new version of iterator.
// It currently contains two implementations, one is Level0Iterator,
Expand All @@ -70,6 +67,7 @@ class CollectIterator {
virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0;
virtual ~LevelIterator() = 0;
};

// Compare row cursors between multiple merge elements,
// if row cursors equal, compare data version.
class LevelIteratorComparator {
Expand All @@ -79,7 +77,6 @@ class CollectIterator {

private:
bool _reverse;
OlapReaderStatistics* _stats;
};

typedef std::priority_queue<LevelIterator*, std::vector<LevelIterator*>,
Expand All @@ -90,15 +87,15 @@ class CollectIterator {
public:
Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader);

OLAPStatus init();
OLAPStatus init() override;

const RowCursor* current_row(bool* delete_flag) const;
const RowCursor* current_row(bool* delete_flag) const override;

const RowCursor* current_row() const;
const RowCursor* current_row() const override;

int64_t version() const;
int64_t version() const override;

OLAPStatus next(const RowCursor** row, bool* delete_flag);
OLAPStatus next(const RowCursor** row, bool* delete_flag) override;

~Level0Iterator();

Expand All @@ -109,36 +106,37 @@ class CollectIterator {
OLAPStatus _refresh_current_row_v2();

RowsetReaderSharedPtr _rs_reader;
const RowCursor* _current_row = nullptr;
const RowCursor* _current_row = nullptr; // It points to the returned row
bool _is_delete = false;
Reader* _reader = nullptr;
// point to rows inside `_row_block`
RowCursor _row_cursor;
RowCursor _row_cursor; // It points to rows inside `_row_block`, maybe not returned
RowBlock* _row_block = nullptr;
};

// Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed)
class Level1Iterator : public LevelIterator {
public:
Level1Iterator(const std::vector<LevelIterator*>& children, bool merge, bool reverse);
Level1Iterator(const std::list<LevelIterator*>& children, bool merge, bool reverse);

OLAPStatus init();
OLAPStatus init() override;

const RowCursor* current_row(bool* delete_flag) const;
const RowCursor* current_row(bool* delete_flag) const override;

const RowCursor* current_row() const;
const RowCursor* current_row() const override;

int64_t version() const;
int64_t version() const override;

OLAPStatus next(const RowCursor** row, bool* delete_flag);
OLAPStatus next(const RowCursor** row, bool* delete_flag) override;

~Level1Iterator();

private:
inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag);
inline OLAPStatus _normal_next(const RowCursor** row, bool* delete_flag);

// each Level0Iterator corresponds to a rowset reader
const std::vector<LevelIterator*> _children;
// Each LevelIterator corresponds to a rowset reader,
// it will be cleared after '_heap' has been initilized when '_merge == true'.
std::list<LevelIterator*> _children;
// point to the Level0Iterator containing the next output row.
// null when CollectIterator hasn't been initialized or reaches EOF.
LevelIterator* _cur_child = nullptr;
Expand All @@ -158,8 +156,9 @@ class CollectIterator {

std::unique_ptr<LevelIterator> _inner_iter;

// each LevelIterator corresponds to a rowset reader
std::vector<LevelIterator*> _children;
// Each LevelIterator corresponds to a rowset reader,
// it will be cleared after '_inner_iter' has been initilized.
std::list<LevelIterator*> _children;

bool _merge = true;
bool _reverse = false;
Expand Down
12 changes: 5 additions & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
_tablet->compute_version_hash_from_rowsets(_input_rowsets, &_output_version_hash);

LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version.first << "-" << _output_version.second
<< ", permits: " << permits;
<< ", output_version=" << _output_version << ", permits: " << permits;

RETURN_NOT_OK(construct_output_rowset_writer());
RETURN_NOT_OK(construct_input_rowset_readers());
Expand All @@ -98,8 +97,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version.first << "-"
<< _output_version.second;
<< ", output_version=" << _output_version;
return res;
}
TRACE("merge rowsets finished");
Expand All @@ -109,8 +107,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
_output_rowset = _output_rs_writer->build();
if (_output_rowset == nullptr) {
LOG(WARNING) << "rowset writer build failed. writer version:"
<< ", output_version=" << _output_version.first << "-"
<< _output_version.second;
<< ", output_version=" << _output_version;
return OLAP_ERR_MALLOC_ERROR;
}
TRACE_COUNTER_INCREMENT("output_rowset_data_size", _output_rowset->data_disk_size());
Expand All @@ -128,6 +125,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {

// 5. update last success compaction time
int64_t now = UnixMillis();
// TODO(yingchun): do the judge in Tablet class
if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
_tablet->set_last_cumu_compaction_success_time(now);
} else {
Expand All @@ -141,7 +139,7 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
}

LOG(INFO) << "succeed to do " << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version.first << "-" << _output_version.second
<< ", output_version=" << _output_version
<< ", current_max_version=" << current_max_version
<< ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num
<< ". elapsed time=" << watch.get_elapse_second() << "s. cumulative_compaction_policy="
Expand Down
Loading

0 comments on commit 9f4fc32

Please sign in to comment.