Skip to content

Commit

Permalink
cbo
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Nov 18, 2020
1 parent ae710f8 commit 559876d
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 66 deletions.
15 changes: 13 additions & 2 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ void DeleteHandler::_merge_del_conds() {

for (const auto& del_cond : _del_conds) {
DCHECK_LE(del_cond.filter_version, _version);
_merged_del_conds.del_cond->merge_condition(del_cond.del_cond->columns());
_merged_del_conds.del_cond->merge_condition(del_cond.del_cond->sorted_conds());
}
}

Expand Down Expand Up @@ -299,7 +299,18 @@ OLAPStatus DeleteHandler::init(const TabletSchema& schema,
_del_conds.push_back(temp);
}

_merge_del_conds();
for (auto& del_cond : _del_conds) {
del_cond.del_cond->normalize();
}

// Do lower cost evaluation at first.
std::sort(_del_conds.begin(), _del_conds.end(),
[] (const DeleteConditions& left,
const DeleteConditions& right) {
return left.del_cond->eval_cost() < right.del_cond->eval_cost();
});

// _merge_del_conds();

_is_inited = true;

Expand Down
100 changes: 76 additions & 24 deletions be/src/olap/olap_cond.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,28 @@ bool Cond::eval(const segment_v2::BloomFilter* bf) const {
return true;
}

int Cond::eval_cost() const {
switch (op) {
case OP_NULL:
case OP_ALL:
return 0;
case OP_IS:
return 1;
case OP_EQ:
case OP_NE:
case OP_LT:
case OP_LE:
case OP_GT:
case OP_GE:
return 2 * operand_field->field_size();
case OP_IN:
case OP_NOT_IN:
return 3 * operand_field->field_size();
default:
return 0;
}
}

// PRECONDITION 1. index is valid; 2. at least has one operand
OLAPStatus CondColumn::add_cond(const TCondition& tcond, const TabletColumn& column) {
auto cond = std::make_shared<Cond>();
Expand Down Expand Up @@ -882,6 +904,19 @@ void CondColumn::merge_cond(const CondColumn& cond_col) {
}
}

void CondColumn::normalize() {
// Do lower cost evaluation at first.
std::sort(_conds.begin(), _conds.end(),
[] (const std::shared_ptr<Cond>& left,
const std::shared_ptr<Cond>& right) {
return left->eval_cost() < right->eval_cost();
});
_cost = 0.0;
for (const auto& cond : _conds) {
_cost += cond->eval_cost();
}
}

bool CondColumn::eval(const RowCursor& row) const {
auto cell = row.cell(_col_index);
for (auto& each_cond : _conds) {
Expand Down Expand Up @@ -976,45 +1011,62 @@ OLAPStatus Conditions::append_condition(const TCondition& tcond) {
}

CondColumn* cond_col = nullptr;
auto it = _columns.find(index);
if (it == _columns.end()) {
auto it = _index_conds.find(index);
if (it == _index_conds.end()) {
cond_col = new CondColumn(*_schema, index);
_columns[index] = cond_col;
_index_conds[index] = cond_col;
} else {
cond_col = it->second;
}

return cond_col->add_cond(tcond, column);
}

void Conditions::merge_condition(const CondColumns& cond_cols) {
void Conditions::normalize() {
for (const auto& cond_column : _index_conds) {
_sorted_conds.push_back(cond_column.second);
}

// Do lower cost evaluation at first.
std::sort(_sorted_conds.begin(), _sorted_conds.end(),
[] (CondColumn* left,
CondColumn* right) {
return left->eval_cost() < right->eval_cost();
});
_cost = 0.0;
for (const auto& cond : _sorted_conds) {
_cost += cond->eval_cost();
}
}

void Conditions::merge_condition(const std::vector<CondColumn*>& cond_cols) {
for (const auto& cond_col : cond_cols) {
int32_t index = cond_col.first;
auto it = _columns.find(index);
if (it == _columns.end()) {
int32_t index = cond_col->col_index();
auto it = _index_conds.find(index);
if (it == _index_conds.end()) {
CondColumn* old_cond_col = new CondColumn(*_schema, index);
old_cond_col->_conds = cond_col.second->conds();
_columns[index] = old_cond_col;
old_cond_col->_conds = cond_col->conds();
_index_conds[index] = old_cond_col;
} else {
it->second->merge_cond(*(cond_col.second));
it->second->merge_cond(*cond_col);
}
}
}

bool Conditions::delete_conditions_eval(const RowCursor& row) const {
if (_columns.empty()) {
if (_sorted_conds.empty()) {
return false;
}

for (auto& each_cond : _columns) {
for (auto& each_cond : _sorted_conds) {
// TODO(yingchun): why only judge key and dup?
if (_cond_column_is_key_or_duplicate(each_cond.second) && !each_cond.second->eval(row)) {
if (_cond_column_is_key_or_duplicate(each_cond) && !each_cond->eval(row)) {
return false;
}
}

VLOG(3) << "Row meets the delete conditions. "
<< "condition_count=" << _columns.size()
<< "condition_count=" << _sorted_conds.size()
<< ", row=" << row.to_string();
return true;
}
Expand All @@ -1023,9 +1075,9 @@ bool Conditions::rowset_pruning_filter(const std::vector<KeyRange>& zone_maps) c
// ZoneMap will store min/max of rowset.
// The function is to filter rowset using ZoneMaps
// and query predicates.
for (auto& cond_it : _columns) {
if (_cond_column_is_key_or_duplicate(cond_it.second)) {
if (cond_it.first < zone_maps.size() && !cond_it.second->eval(zone_maps.at(cond_it.first))) {
for (auto& cond_it : _sorted_conds) {
if (_cond_column_is_key_or_duplicate(cond_it)) {
if (cond_it->col_index() < zone_maps.size() && !cond_it->eval(zone_maps.at(cond_it->col_index()))) {
return true;
}
}
Expand All @@ -1034,7 +1086,7 @@ bool Conditions::rowset_pruning_filter(const std::vector<KeyRange>& zone_maps) c
}

int Conditions::delete_pruning_filter(const std::vector<KeyRange>& zone_maps) const {
if (_columns.empty()) {
if (_sorted_conds.empty()) {
return DEL_NOT_SATISFIED;
}

Expand All @@ -1049,19 +1101,19 @@ int Conditions::delete_pruning_filter(const std::vector<KeyRange>& zone_maps) co
int ret = DEL_NOT_SATISFIED;
bool del_partial_satisfied = false;
bool del_not_satisfied = false;
for (auto& cond_it : _columns) {
for (auto& cond_it : _sorted_conds) {
/*
* this is base on the assumption that the delete condition
* is only about key field, not about value field except the storage model is duplicate.
*/
if (!_cond_column_is_key_or_duplicate(cond_it.second) || cond_it.first > zone_maps.size()) {
if (!_cond_column_is_key_or_duplicate(cond_it) || cond_it->col_index() > zone_maps.size()) {
LOG(WARNING) << "where condition not equal column statistics size. "
<< "cond_id=" << cond_it.first << ", zone_map_size=" << zone_maps.size();
<< "cond_id=" << cond_it->col_index() << ", zone_map_size=" << zone_maps.size();
del_partial_satisfied = true;
continue;
}

int del_ret = cond_it.second->del_eval(zone_maps.at(cond_it.first));
int del_ret = cond_it->del_eval(zone_maps.at(cond_it->col_index()));
if (DEL_SATISFIED == del_ret) {
continue;
} else if (DEL_PARTIAL_SATISFIED == del_ret) {
Expand All @@ -1083,8 +1135,8 @@ int Conditions::delete_pruning_filter(const std::vector<KeyRange>& zone_maps) co
}

CondColumn* Conditions::get_column(int32_t cid) const {
auto iter = _columns.find(cid);
if (iter != _columns.end()) {
auto iter = _index_conds.find(cid);
if (iter != _index_conds.end()) {
return iter->second;
}
return nullptr;
Expand Down
52 changes: 43 additions & 9 deletions be/src/olap/olap_cond.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ struct Cond {
return op == OP_EQ || op == OP_IN || op == OP_IS;
}

int eval_cost() const;

CondOp op = OP_NULL;
// valid when op is not OP_IN and OP_NOT_IN
std::shared_ptr<WrapperField> operand_field;
Expand All @@ -115,6 +117,8 @@ class CondColumn {
OLAPStatus add_cond(const TCondition& tcond, const TabletColumn& column);
void merge_cond(const CondColumn& cond_col);

void normalize();

// 对一行数据中的指定列,用所有过滤条件进行比较,如果所有条件都满足,则过滤此行
// Return true means this row should be filtered out, otherwise return false
bool eval(const RowCursor& row) const;
Expand Down Expand Up @@ -150,14 +154,23 @@ class CondColumn {
return _conds;
}

double eval_cost() const {
return _cost;
}

int32_t col_index() const {
return _col_index;
}

private:
friend class Conditions;

bool _is_key;
int32_t _col_index;
bool _is_key = false;
int32_t _col_index = 0;
// TODO(yingchun): DELETE FROM xx WHERE col1 > a AND col1 < b
// Conds in _conds are in 'AND' relationship
std::vector<std::shared_ptr<Cond>> _conds;
double _cost = 0.0;
};

// 一次请求所关联的条件
Expand All @@ -173,10 +186,10 @@ class Conditions {
}

void finalize() {
for (auto& it : _columns) {
for (auto& it : _index_conds) {
delete it.second;
}
_columns.clear();
_index_conds.clear();
}

// TODO(yingchun): should do it in constructor
Expand All @@ -190,7 +203,9 @@ class Conditions {
// 2. column类型是double, float
OLAPStatus append_condition(const TCondition& condition);

void merge_condition(const CondColumns& cond_cols);
void normalize();

void merge_condition(const std::vector<CondColumn*>& cond_cols);

// 通过所有列上的删除条件对RowCursor进行过滤
// Return true means this row should be filtered out, otherwise return false
Expand All @@ -202,12 +217,28 @@ class Conditions {
// Whether the rowset satisfied delete condition
int delete_pruning_filter(const std::vector<KeyRange>& zone_maps) const;

const CondColumns& columns() const {
return _columns;
const CondColumns& index_conds() const {
return _index_conds;
}

CondColumn* col_cond(int32_t col_index) const {
auto col_cond = _index_conds.find(col_index);
if (col_cond == _index_conds.end()) {
return nullptr;
}
return col_cond->second;
}

const std::vector<CondColumn*>& sorted_conds() const {
return _sorted_conds;
}

CondColumn* get_column(int32_t cid) const;

double eval_cost() const {
return _cost;
}

private:
bool _cond_column_is_key_or_duplicate(const CondColumn* cc) const {
return cc->is_key() || _schema->keys_type() == KeysType::DUP_KEYS;
Expand All @@ -216,8 +247,11 @@ class Conditions {
private:
const TabletSchema* _schema = nullptr;
// TODO(yingchun): DELETE FROM xx WHERE col1 IN (a) AND col2 IN (b)
// CondColumns in _columns are in 'AND' relationship
CondColumns _columns;
// CondColumns in _index_conds are in 'AND' relationship
CondColumns _index_conds;

double _cost = 0.0;
std::vector<CondColumn*> _sorted_conds;
};

} // namespace doris
Expand Down
21 changes: 11 additions & 10 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,10 +626,10 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) {
if (!_delete_handler.empty() && read_params.aggregation) {
set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
for (const auto& conds : _delete_handler.get_delete_conditions()) {
for (const auto& cond_column : conds.del_cond->columns()) {
if (column_set.find(cond_column.first) == column_set.end()) {
column_set.insert(cond_column.first);
_return_columns.push_back(cond_column.first);
for (const auto& cond_column : conds.del_cond->sorted_conds()) {
if (column_set.find(cond_column->col_index()) == column_set.end()) {
column_set.insert(cond_column->col_index());
_return_columns.push_back(cond_column->col_index());
}
}
}
Expand Down Expand Up @@ -673,8 +673,8 @@ OLAPStatus Reader::_init_return_columns(const ReaderParams& read_params) {

void Reader::_init_seek_columns() {
std::unordered_set<uint32_t> column_set(_return_columns.begin(), _return_columns.end());
for (auto& it : _conditions.columns()) {
column_set.insert(it.first);
for (auto& it : _conditions.sorted_conds()) {
column_set.insert(it->col_index());
}
size_t max_key_column_count = 0;
for (const auto& key : _keys_param.start_keys) {
Expand Down Expand Up @@ -758,6 +758,7 @@ void Reader::_init_conditions_param(const ReaderParams& read_params) {
_col_predicates.push_back(predicate);
}
}
_conditions.normalize();
}

#define COMPARISON_PREDICATE_CONDITION_VALUE(NAME, PREDICATE) \
Expand Down Expand Up @@ -1002,14 +1003,14 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition) {

void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
// add all columns with condition to _load_bf_columns
for (const auto& cond_column : _conditions.columns()) {
if (!_tablet->tablet_schema().column(cond_column.first).is_bf_column()) {
for (const auto& cond_column : _conditions.sorted_conds()) {
if (!_tablet->tablet_schema().column(cond_column->col_index()).is_bf_column()) {
continue;
}
for (const auto& cond : cond_column.second->conds()) {
for (const auto& cond : cond_column->conds()) {
if (cond->op == OP_EQ
|| (cond->op == OP_IN && cond->operand_set.size() < MAX_OP_IN_FIELD_NUM)) {
_load_bf_columns.insert(cond_column.first);
_load_bf_columns.insert(cond_column->col_index());
}
}
}
Expand Down
Loading

0 comments on commit 559876d

Please sign in to comment.