From 32356313e64d937374f52aa40fabedea5715a1d1 Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Mon, 11 Nov 2024 17:15:19 +0300 Subject: [PATCH] YQ-3846 RD support OR split during pushdown (#11439) --- .../common/pushdown/physical_opt.cpp | 23 +++++----- .../common/pushdown/predicate_node.cpp | 44 ++++++++++++------- .../common/pushdown/predicate_node.h | 2 +- .../yql/providers/common/pushdown/settings.h | 10 ++++- .../yql_generic_predicate_pushdown.cpp | 25 +++++++---- .../pq/provider/yql_pq_logical_opt.cpp | 10 ++++- .../s3/provider/yql_s3_logical_opt.cpp | 2 +- ydb/tests/fq/yds/test_row_dispatcher.py | 8 +++- 8 files changed, 84 insertions(+), 40 deletions(-) diff --git a/ydb/library/yql/providers/common/pushdown/physical_opt.cpp b/ydb/library/yql/providers/common/pushdown/physical_opt.cpp index 602ab6c8f0b8..304c647e0159 100644 --- a/ydb/library/yql/providers/common/pushdown/physical_opt.cpp +++ b/ydb/library/yql/providers/common/pushdown/physical_opt.cpp @@ -11,26 +11,29 @@ using namespace NNodes; namespace { -TPredicateNode SplitForPartialPushdown( - const NPushdown::TPredicateNode& predicateTree, - TExprContext& ctx, - TPositionHandle pos) { +TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, TExprContext& ctx, TPositionHandle pos, const TSettings& settings) { if (predicateTree.CanBePushed) { return predicateTree; } - if (predicateTree.Op != NPushdown::EBoolOp::And) { - return NPushdown::TPredicateNode(); // Not valid, => return the same node from optimizer + if (predicateTree.Op != NPushdown::EBoolOp::And && (!settings.IsEnabled(TSettings::EFeatureFlag::SplitOrOperator) || predicateTree.Op != NPushdown::EBoolOp::Or)) { + // Predicate can't be split, so return invalid value and skip this branch + return NPushdown::TPredicateNode(); } std::vector pushable; for (auto& predicate : predicateTree.Children) { - if (predicate.CanBePushed) { - pushable.emplace_back(predicate); + NPushdown::TPredicateNode pushablePredicate = SplitForPartialPushdown(predicate, ctx, pos, settings); + if (pushablePredicate.IsValid()) { + pushable.emplace_back(pushablePredicate); + } else if (predicateTree.Op == NPushdown::EBoolOp::Or) { + // One of the OR branch was invalid, so the whole predicate is invalid + return NPushdown::TPredicateNode(); } } + NPushdown::TPredicateNode predicateToPush; - predicateToPush.SetPredicates(pushable, ctx, pos); + predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op); return predicateToPush; } @@ -51,7 +54,7 @@ TMaybeNode MakePushdownPredicate(const TCoLambda& lambda, TExprContex NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg.Get(), TExprBase(lambdaArg), settings); YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid"); - NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos); + NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos, settings); if (!predicateToPush.IsValid()) { return {}; } diff --git a/ydb/library/yql/providers/common/pushdown/predicate_node.cpp b/ydb/library/yql/providers/common/pushdown/predicate_node.cpp index 68bd089b81a9..0f506e83b4e1 100644 --- a/ydb/library/yql/providers/common/pushdown/predicate_node.cpp +++ b/ydb/library/yql/providers/common/pushdown/predicate_node.cpp @@ -30,26 +30,38 @@ bool TPredicateNode::IsValid() const { return res && ExprNode.IsValid(); } -void TPredicateNode::SetPredicates(const std::vector& predicates, TExprContext& ctx, TPositionHandle pos) { +void TPredicateNode::SetPredicates(const std::vector& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op) { auto predicatesSize = predicates.size(); if (predicatesSize == 0) { return; - } else if (predicatesSize == 1) { + } + if (predicatesSize == 1) { *this = predicates[0]; - } else { - Op = EBoolOp::And; - Children = predicates; - CanBePushed = true; - - TVector exprNodes; - exprNodes.reserve(predicatesSize); - for (auto& pred : predicates) { - exprNodes.emplace_back(pred.ExprNode.Cast()); - CanBePushed &= pred.CanBePushed; - } - ExprNode = NNodes::Build(ctx, pos) - .Add(exprNodes) - .Done(); + return; + } + + Op = op; + Children = predicates; + CanBePushed = true; + + TVector exprNodes; + exprNodes.reserve(predicatesSize); + for (auto& pred : predicates) { + exprNodes.emplace_back(pred.ExprNode.Cast()); + CanBePushed &= pred.CanBePushed; + } + + switch (op) { + case EBoolOp::And: + ExprNode = NNodes::Build(ctx, pos).Add(exprNodes).Done(); + break; + + case EBoolOp::Or: + ExprNode = NNodes::Build(ctx, pos).Add(exprNodes).Done(); + break; + + default: + throw yexception() << "Unsupported operator for predicate node creation: " << static_cast(op); } } diff --git a/ydb/library/yql/providers/common/pushdown/predicate_node.h b/ydb/library/yql/providers/common/pushdown/predicate_node.h index 06a167b60bc9..bf93b8ac7cb7 100644 --- a/ydb/library/yql/providers/common/pushdown/predicate_node.h +++ b/ydb/library/yql/providers/common/pushdown/predicate_node.h @@ -23,7 +23,7 @@ struct TPredicateNode { ~TPredicateNode(); bool IsValid() const; - void SetPredicates(const std::vector& predicates, TExprContext& ctx, TPositionHandle pos); + void SetPredicates(const std::vector& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op); NNodes::TMaybeNode ExprNode; std::vector Children; diff --git a/ydb/library/yql/providers/common/pushdown/settings.h b/ydb/library/yql/providers/common/pushdown/settings.h index df0164b9d5fa..acae35b517b4 100644 --- a/ydb/library/yql/providers/common/pushdown/settings.h +++ b/ydb/library/yql/providers/common/pushdown/settings.h @@ -27,7 +27,15 @@ struct TSettings { TimestampCtor = 1 << 17, JustPassthroughOperators = 1 << 18, // if + coalesce + just InOperator = 1 << 19, // IN() - IsDistinctOperator = 1 << 20 // IS NOT DISTINCT FROM / IS DISTINCT FROM + IsDistinctOperator = 1 << 20, // IS NOT DISTINCT FROM / IS DISTINCT FROM + + // Option which enables partial pushdown for sequence of OR + // For example next predicate: + // ($A AND $B) OR ($C AND $D) + // May be partially pushdowned as: + // $A OR $C + // In case of unsupported / complicated expressions $B and $D + SplitOrOperator = 1 << 21 }; explicit TSettings(NLog::EComponent logComponent) diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp index 251ce64380b8..2b0d72c27494 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp @@ -97,6 +97,7 @@ namespace NYql { } #undef MATCH_ATOM +#undef MATCH_ARITHMETICAL #define EXPR_NODE_TO_COMPARE_TYPE(TExprNodeType, COMPARE_TYPE) \ if (!opMatched && compare.Maybe()) { \ @@ -118,7 +119,7 @@ namespace NYql { EXPR_NODE_TO_COMPARE_TYPE(TCoAggrNotEqual, ID); if (proto->operation() == TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED) { - err << "unknown operation: " << compare.Raw()->Content(); + err << "unknown compare operation: " << compare.Raw()->Content(); return false; } return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err); @@ -181,7 +182,7 @@ namespace NYql { } else if (auto maybeAsList = expr.Maybe()) { collection = maybeAsList.Cast().Ptr(); } else { - err << "unknown operation: " << expr.Ref().Content(); + err << "unknown source for in: " << expr.Ref().Content(); return false; } @@ -195,7 +196,7 @@ namespace NYql { bool SerializeIsNotDistinctFrom(const TExprBase& predicate, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err, bool invert) { if (predicate.Ref().ChildrenSize() != 2) { - err << "unknown predicate, expected 2, children size " << predicate.Ref().ChildrenSize(); + err << "invalid IsNotDistinctFrom predicate, expected 2 children but got " << predicate.Ref().ChildrenSize(); return false; } TPredicate::TComparison* proto = predicateProto->mutable_comparison(); @@ -356,7 +357,7 @@ namespace NYql { auto left = FormatExpression(expression.left_value()); auto right = FormatExpression(expression.right_value()); - return left + operation + right; + return TStringBuilder() << "(" << left << operation << right << ")"; } TString FormatNegation(const TPredicate_TNegation& negation) { @@ -524,14 +525,22 @@ namespace NYql { TString FormatIn(const TPredicate_TIn& in) { auto value = FormatExpression(in.value()); - TString list; + TStringStream list; for (const auto& expr : in.set()) { if (!list.empty()) { - list += ","; + list << ", "; + } else { + list << value << " IN ("; } - list += FormatExpression(expr); + list << FormatExpression(expr); } - return value + " IN (" + list + ")"; + + if (list.empty()) { + throw yexception() << "failed to format IN statement, no operands"; + } + + list << ")"; + return list.Str(); } TString FormatPredicate(const TPredicate& predicate, bool topLevel ) { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp index da8fa9923fd6..818f6647cb13 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp @@ -30,7 +30,15 @@ namespace { : NPushdown::TSettings(NLog::EComponent::ProviderGeneric) { using EFlag = NPushdown::TSettings::EFeatureFlag; - Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators); + Enable( + // Operator features + EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | + EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | + EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators | + + // Split features + EFlag::SplitOrOperator + ); } }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index 30486010f309..92f913027a0e 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -286,7 +286,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { } } NPushdown::TPredicateNode predicateToPush; - predicateToPush.SetPredicates(pushable, ctx, pos); + predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op); return predicateToPush; } diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 3814edc7ada2..0cbbfc2845ed 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -302,7 +302,7 @@ def test_filters_non_optional_field(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True ) - self.init_topics("test_filter") + self.init_topics("test_filters_non_optional_field") sql = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -334,7 +334,7 @@ def test_filters_optional_field(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True ) - self.init_topics("test_filter") + self.init_topics("test_filters_optional_field") sql = Rf''' INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` @@ -348,12 +348,16 @@ def test_filters_optional_field(self, kikimr, client): self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"') filter = 'flag' self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `flag`') + filter = 'time * (field2 - field1) != 0' + self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` * (`field2` - `field1`)) <> 0') filter = ' event IS NOT DISTINCT FROM "event2"' self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"') filter = ' event IS DISTINCT FROM "event1"' self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"') filter = ' field1 IS DISTINCT FROM field2' self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `field1` IS DISTINCT FROM `field2`') + filter = 'time == 102 OR (field2 IS NOT DISTINCT FROM 1005 AND Random(field1) < 10.0)' + self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` = 102 OR `field2` IS NOT DISTINCT FROM 1005)') filter = 'event IN ("event2")' self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")') filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'