From cd072fe1abd50e8f67d376ceda8eacac79d2ac17 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 15 Oct 2024 12:28:04 +0000 Subject: [PATCH] Supported optional columns as nulls --- .../fq/libs/row_dispatcher/json_filter.cpp | 21 ++++++++++++++++--- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 18 ++++++++++++++++ ydb/tests/fq/yds/test_row_dispatcher.py | 15 +++++++++---- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 3ef0da86e6d5..30ee6e6b50bd 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -16,11 +16,24 @@ using TCallback = NFq::TJsonFilter::TCallback; const char* OffsetFieldName = "_offset"; TString LogPrefix = "JsonFilter: "; +NYT::TNode CreateTypeNode(const TString& fieldType) { + return NYT::TNode::CreateList() + .Add("DataType") + .Add(fieldType); +} + void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { node.Add( NYT::TNode::CreateList() .Add(fieldName) - .Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType)) + .Add(CreateTypeNode(fieldType)) + ); +} + +void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { + node.Add(NYT::TNode::CreateList() + .Add(fieldName) + .Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType))) ); } @@ -28,7 +41,7 @@ NYT::TNode MakeInputSchema(const TVector& columns) { auto structMembers = NYT::TNode::CreateList(); AddField(structMembers, OffsetFieldName, "Uint64"); for (const auto& col : columns) { - AddField(structMembers, col, "String"); + AddOptionalField(structMembers, col, "String"); } return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); } @@ -112,7 +125,9 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumerPush(std::move(result)); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 7bda6cd4c1fe..9c2305711715 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -1,3 +1,5 @@ +#include + #include #include @@ -23,6 +25,8 @@ class TFixture : public NUnitTest::TBaseFixture { TAutoPtr app = new TAppPrepare(); Runtime.Initialize(app->Unwrap()); Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG); + + NKikimr::EnableYDBBacktraceFormat(); } void TearDown(NUnitTest::TTestContext& /* context */) override { @@ -91,6 +95,20 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } + Y_UNIT_TEST_F(NullValues, TFixture) { + TMap result; + MakeFilter( + {"a1"}, + {"Optional"}, + "where a1 is null", + [&](ui64 offset, const TString& json) { + result[offset] = json; + }); + Filter->Push({5}, {{std::string_view()}}); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null})", result[5]); + } + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeFilter( {"a1", "a2"}, diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 61b760f3d98b..ad40c3ed3ff5 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -155,7 +155,6 @@ def test_simple_not_null(self, kikimr, client): wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) @yq_v1 - @pytest.mark.skip(reason="Is not implemented") def test_simple_optional(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True @@ -249,6 +248,9 @@ def test_nested_types(self, kikimr, client): wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) stop_yds_query(client, query_id) + issues = str(client.describe_query(query_id).result.query.transient_issue) + assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues + @yq_v1 def test_filter(self, kikimr, client): client.create_yds_connection( @@ -296,7 +298,7 @@ def test_filter_missing_fields(self, kikimr, client): INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}` WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL)) - WHERE data = "";''' + WHERE data IS NULL;''' query_id = start_yds_query(kikimr, client, sql) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) @@ -304,6 +306,8 @@ def test_filter_missing_fields(self, kikimr, client): data = [ '{"time": 101, "event": "event1"}', '{"time": 102, "data": null, "event": "event2"}', + '{"time": 103, "data": "", "event": "event2"}', + '{"time": 104, "data": "null", "event": "event2"}', ] self.write_stream(data) @@ -313,6 +317,9 @@ def test_filter_missing_fields(self, kikimr, client): wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) stop_yds_query(client, query_id) + issues = str(client.describe_query(query_id).result.query.transient_issue) + assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues + @yq_v1 def test_filter_use_unsupported_predicate(self, kikimr, client): client.create_yds_connection( @@ -525,9 +532,9 @@ def test_stop_start_with_filter(self, kikimr, client): client.create_yds_connection( YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True ) - self.init_topics("test_stop_start", create_output=False) + self.init_topics("test_stop_start_with_filter", create_output=False) - output_topic = "test_stop_start" + output_topic = "test_stop_start_with_filter" create_stream(output_topic, partitions_count=1) create_read_rule(output_topic, self.consumer_name)