Skip to content

Commit

Permalink
YQ-3846 RD support OR split during pushdown (ydb-platform#11439)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 11, 2024
1 parent 2c99cf0 commit 3235631
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 40 deletions.
23 changes: 13 additions & 10 deletions ydb/library/yql/providers/common/pushdown/physical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,29 @@ using namespace NNodes;

namespace {

TPredicateNode SplitForPartialPushdown(
const NPushdown::TPredicateNode& predicateTree,
TExprContext& ctx,
TPositionHandle pos) {
TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, TExprContext& ctx, TPositionHandle pos, const TSettings& settings) {
if (predicateTree.CanBePushed) {
return predicateTree;
}

if (predicateTree.Op != NPushdown::EBoolOp::And) {
return NPushdown::TPredicateNode(); // Not valid, => return the same node from optimizer
if (predicateTree.Op != NPushdown::EBoolOp::And && (!settings.IsEnabled(TSettings::EFeatureFlag::SplitOrOperator) || predicateTree.Op != NPushdown::EBoolOp::Or)) {
// Predicate can't be split, so return invalid value and skip this branch
return NPushdown::TPredicateNode();
}

std::vector<NPushdown::TPredicateNode> pushable;
for (auto& predicate : predicateTree.Children) {
if (predicate.CanBePushed) {
pushable.emplace_back(predicate);
NPushdown::TPredicateNode pushablePredicate = SplitForPartialPushdown(predicate, ctx, pos, settings);
if (pushablePredicate.IsValid()) {
pushable.emplace_back(pushablePredicate);
} else if (predicateTree.Op == NPushdown::EBoolOp::Or) {
// One of the OR branch was invalid, so the whole predicate is invalid
return NPushdown::TPredicateNode();
}
}

NPushdown::TPredicateNode predicateToPush;
predicateToPush.SetPredicates(pushable, ctx, pos);
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
return predicateToPush;
}

Expand All @@ -51,7 +54,7 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg.Get(), TExprBase(lambdaArg), settings);
YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");

NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos);
NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos, settings);
if (!predicateToPush.IsValid()) {
return {};
}
Expand Down
44 changes: 28 additions & 16 deletions ydb/library/yql/providers/common/pushdown/predicate_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,38 @@ bool TPredicateNode::IsValid() const {
return res && ExprNode.IsValid();
}

void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op) {
auto predicatesSize = predicates.size();
if (predicatesSize == 0) {
return;
} else if (predicatesSize == 1) {
}
if (predicatesSize == 1) {
*this = predicates[0];
} else {
Op = EBoolOp::And;
Children = predicates;
CanBePushed = true;

TVector<NNodes::TExprBase> exprNodes;
exprNodes.reserve(predicatesSize);
for (auto& pred : predicates) {
exprNodes.emplace_back(pred.ExprNode.Cast());
CanBePushed &= pred.CanBePushed;
}
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
.Add(exprNodes)
.Done();
return;
}

Op = op;
Children = predicates;
CanBePushed = true;

TVector<NNodes::TExprBase> exprNodes;
exprNodes.reserve(predicatesSize);
for (auto& pred : predicates) {
exprNodes.emplace_back(pred.ExprNode.Cast());
CanBePushed &= pred.CanBePushed;
}

switch (op) {
case EBoolOp::And:
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos).Add(exprNodes).Done();
break;

case EBoolOp::Or:
ExprNode = NNodes::Build<NNodes::TCoOr>(ctx, pos).Add(exprNodes).Done();
break;

default:
throw yexception() << "Unsupported operator for predicate node creation: " << static_cast<int>(op);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/common/pushdown/predicate_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ struct TPredicateNode {
~TPredicateNode();

bool IsValid() const;
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos);
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op);

NNodes::TMaybeNode<NNodes::TExprBase> ExprNode;
std::vector<TPredicateNode> Children;
Expand Down
10 changes: 9 additions & 1 deletion ydb/library/yql/providers/common/pushdown/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@ struct TSettings {
TimestampCtor = 1 << 17,
JustPassthroughOperators = 1 << 18, // if + coalesce + just
InOperator = 1 << 19, // IN()
IsDistinctOperator = 1 << 20 // IS NOT DISTINCT FROM / IS DISTINCT FROM
IsDistinctOperator = 1 << 20, // IS NOT DISTINCT FROM / IS DISTINCT FROM

// Option which enables partial pushdown for sequence of OR
// For example next predicate:
// ($A AND $B) OR ($C AND $D)
// May be partially pushdowned as:
// $A OR $C
// In case of unsupported / complicated expressions $B and $D
SplitOrOperator = 1 << 21
};

explicit TSettings(NLog::EComponent logComponent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ namespace NYql {
}

#undef MATCH_ATOM
#undef MATCH_ARITHMETICAL

#define EXPR_NODE_TO_COMPARE_TYPE(TExprNodeType, COMPARE_TYPE) \
if (!opMatched && compare.Maybe<TExprNodeType>()) { \
Expand All @@ -118,7 +119,7 @@ namespace NYql {
EXPR_NODE_TO_COMPARE_TYPE(TCoAggrNotEqual, ID);

if (proto->operation() == TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED) {
err << "unknown operation: " << compare.Raw()->Content();
err << "unknown compare operation: " << compare.Raw()->Content();
return false;
}
return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err);
Expand Down Expand Up @@ -181,7 +182,7 @@ namespace NYql {
} else if (auto maybeAsList = expr.Maybe<TCoAsList>()) {
collection = maybeAsList.Cast().Ptr();
} else {
err << "unknown operation: " << expr.Ref().Content();
err << "unknown source for in: " << expr.Ref().Content();
return false;
}

Expand All @@ -195,7 +196,7 @@ namespace NYql {

bool SerializeIsNotDistinctFrom(const TExprBase& predicate, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err, bool invert) {
if (predicate.Ref().ChildrenSize() != 2) {
err << "unknown predicate, expected 2, children size " << predicate.Ref().ChildrenSize();
err << "invalid IsNotDistinctFrom predicate, expected 2 children but got " << predicate.Ref().ChildrenSize();
return false;
}
TPredicate::TComparison* proto = predicateProto->mutable_comparison();
Expand Down Expand Up @@ -356,7 +357,7 @@ namespace NYql {

auto left = FormatExpression(expression.left_value());
auto right = FormatExpression(expression.right_value());
return left + operation + right;
return TStringBuilder() << "(" << left << operation << right << ")";
}

TString FormatNegation(const TPredicate_TNegation& negation) {
Expand Down Expand Up @@ -524,14 +525,22 @@ namespace NYql {

TString FormatIn(const TPredicate_TIn& in) {
auto value = FormatExpression(in.value());
TString list;
TStringStream list;
for (const auto& expr : in.set()) {
if (!list.empty()) {
list += ",";
list << ", ";
} else {
list << value << " IN (";
}
list += FormatExpression(expr);
list << FormatExpression(expr);
}
return value + " IN (" + list + ")";

if (list.empty()) {
throw yexception() << "failed to format IN statement, no operands";
}

list << ")";
return list.Str();
}

TString FormatPredicate(const TPredicate& predicate, bool topLevel ) {
Expand Down
10 changes: 9 additions & 1 deletion ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ namespace {
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
{
using EFlag = NPushdown::TSettings::EFeatureFlag;
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators);
Enable(
// Operator features
EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 |
EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator |
EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators |

// Split features
EFlag::SplitOrOperator
);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
}
}
NPushdown::TPredicateNode predicateToPush;
predicateToPush.SetPredicates(pushable, ctx, pos);
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
return predicateToPush;
}

Expand Down
8 changes: 6 additions & 2 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def test_filters_non_optional_field(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_filter")
self.init_topics("test_filters_non_optional_field")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand Down Expand Up @@ -334,7 +334,7 @@ def test_filters_optional_field(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_filter")
self.init_topics("test_filters_optional_field")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
Expand All @@ -348,12 +348,16 @@ def test_filters_optional_field(self, kikimr, client):
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
filter = 'flag'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `flag`')
filter = 'time * (field2 - field1) != 0'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` * (`field2` - `field1`)) <> 0')
filter = ' event IS NOT DISTINCT FROM "event2"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"')
filter = ' event IS DISTINCT FROM "event1"'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
filter = ' field1 IS DISTINCT FROM field2'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `field1` IS DISTINCT FROM `field2`')
filter = 'time == 102 OR (field2 IS NOT DISTINCT FROM 1005 AND Random(field1) < 10.0)'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` = 102 OR `field2` IS NOT DISTINCT FROM 1005)')
filter = 'event IN ("event2")'
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
Expand Down

0 comments on commit 3235631

Please sign in to comment.