Skip to content

Commit

Permalink
Supported optional columns as nulls
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Oct 15, 2024
1 parent eca5b6e commit cd072fe
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
21 changes: 18 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,32 @@ using TCallback = NFq::TJsonFilter::TCallback;
const char* OffsetFieldName = "_offset";
TString LogPrefix = "JsonFilter: ";

NYT::TNode CreateTypeNode(const TString& fieldType) {
return NYT::TNode::CreateList()
.Add("DataType")
.Add(fieldType);
}

void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(
NYT::TNode::CreateList()
.Add(fieldName)
.Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType))
.Add(CreateTypeNode(fieldType))
);
}

void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(NYT::TNode::CreateList()
.Add(fieldName)
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
);
}

NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
auto structMembers = NYT::TNode::CreateList();
AddField(structMembers, OffsetFieldName, "Uint64");
for (const auto& col : columns) {
AddField(structMembers, col, "String");
AddOptionalField(structMembers, col, "String");
}
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
}
Expand Down Expand Up @@ -112,7 +125,9 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T

size_t fieldId = 0;
for (const auto& column : values.second) {
items[FieldsPositions[fieldId++]] = NKikimr::NMiniKQL::MakeString(column[rowId]);
items[FieldsPositions[fieldId++]] = column[rowId].data()
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
: NKikimr::NUdf::TUnboxedValuePod();
}

Worker->Push(std::move(result));
Expand Down
18 changes: 18 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <ydb/core/base/backtrace.h>

#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/core/fq/libs/events/events.h>

Expand All @@ -23,6 +25,8 @@ class TFixture : public NUnitTest::TBaseFixture {
TAutoPtr<TAppPrepare> app = new TAppPrepare();
Runtime.Initialize(app->Unwrap());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);

NKikimr::EnableYDBBacktraceFormat();
}

void TearDown(NUnitTest::TTestContext& /* context */) override {
Expand Down Expand Up @@ -91,6 +95,20 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) {
UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]);
}

Y_UNIT_TEST_F(NullValues, TFixture) {
TMap<ui64, TString> result;
MakeFilter(
{"a1"},
{"Optional<String>"},
"where a1 is null",
[&](ui64 offset, const TString& json) {
result[offset] = json;
});
Filter->Push({5}, {{std::string_view()}});
UNIT_ASSERT_VALUES_EQUAL(1, result.size());
UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null})", result[5]);
}

Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
MakeFilter(
{"a1", "a2"},
Expand Down
15 changes: 11 additions & 4 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ def test_simple_not_null(self, kikimr, client):
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

@yq_v1
@pytest.mark.skip(reason="Is not implemented")
def test_simple_optional(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
Expand Down Expand Up @@ -249,6 +248,9 @@ def test_nested_types(self, kikimr, client):
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
stop_yds_query(client, query_id)

issues = str(client.describe_query(query_id).result.query.transient_issue)
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues

@yq_v1
def test_filter(self, kikimr, client):
client.create_yds_connection(
Expand Down Expand Up @@ -296,14 +298,16 @@ def test_filter_missing_fields(self, kikimr, client):
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String NOT NULL))
WHERE data = "";'''
WHERE data IS NULL;'''

query_id = start_yds_query(kikimr, client, sql)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

data = [
'{"time": 101, "event": "event1"}',
'{"time": 102, "data": null, "event": "event2"}',
'{"time": 103, "data": "", "event": "event2"}',
'{"time": 104, "data": "null", "event": "event2"}',
]

self.write_stream(data)
Expand All @@ -313,6 +317,9 @@ def test_filter_missing_fields(self, kikimr, client):
wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
stop_yds_query(client, query_id)

issues = str(client.describe_query(query_id).result.query.transient_issue)
assert "Row dispatcher will use the predicate:" in issues, "Incorrect Issues: " + issues

@yq_v1
def test_filter_use_unsupported_predicate(self, kikimr, client):
client.create_yds_connection(
Expand Down Expand Up @@ -525,9 +532,9 @@ def test_stop_start_with_filter(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_stop_start", create_output=False)
self.init_topics("test_stop_start_with_filter", create_output=False)

output_topic = "test_stop_start"
output_topic = "test_stop_start_with_filter"
create_stream(output_topic, partitions_count=1)
create_read_rule(output_topic, self.consumer_name)

Expand Down

0 comments on commit cd072fe

Please sign in to comment.