From 6595ec016cbfd90d6f4cb428bb4eb7b5631a528d Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy Date: Fri, 11 Oct 2024 12:46:12 +0300 Subject: [PATCH] YQ-3722 RD fixed nested type parsing (#10300) --- .../fq/libs/row_dispatcher/json_filter.cpp | 8 ++- .../fq/libs/row_dispatcher/json_parser.cpp | 67 ++++++++++++++----- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 53 ++++++++++++++- ydb/tests/fq/yds/test_row_dispatcher.py | 31 +++++++++ 4 files changed, 141 insertions(+), 18 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index b432494d6623..d43bc2fbf365 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -258,7 +258,13 @@ class TJsonFilter::TImpl { Y_ABORT_UNLESS(columnNames.size() == columnTypes.size()); str << OffsetFieldName << ", "; for (size_t i = 0; i < columnNames.size(); ++i) { - str << "CAST(" << columnNames[i] << " as " << columnTypes[i] << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : ""); + TString columnType = columnTypes[i]; + if (columnType == "Json") { + columnType = "String"; + } else if (columnType == "Optional") { + columnType = "Optional"; + } + str << "CAST(" << columnNames[i] << " as " << columnType << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : ""); } str << " FROM Input;\n"; str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n"; diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index f0e9ab7122d6..782505f91fb3 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -57,20 +57,28 @@ void TJsonParserBuffer::Clear() { //// TJsonParser class TJsonParser::TImpl { + struct TColumnDescription { + std::string Name; + TString Type; + }; + public: TImpl(const TVector& columns, const TVector& types) : ParsedValues(columns.size()) { - Y_UNUSED(types); // TODO: Will be used for UV creation + Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); Columns.reserve(columns.size()); - for (const auto& column : columns) { - Columns.emplace_back(column); + for (size_t i = 0; i < columns.size(); i++) { + Columns.emplace_back(TColumnDescription{ + .Name = columns[i], + .Type = SkipOptional(types[i]) + }); } ColumnsIndex.reserve(columns.size()); - for (size_t i = 0; i < Columns.size(); i++) { - ColumnsIndex.emplace(std::string_view(Columns[i]), i); + for (size_t i = 0; i < columns.size(); i++) { + ColumnsIndex.emplace(std::string_view(Columns[i].Name), i); } } @@ -86,6 +94,7 @@ class TJsonParser::TImpl { simdjson::ondemand::parser parser; parser.threaded = false; + size_t rowId = 0; simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE); for (auto document : documents) { for (auto item : document.get_object()) { @@ -94,17 +103,33 @@ class TJsonParser::TImpl { continue; } - auto& parsedColumn = ParsedValues[it->second]; - if (item.value().is_string()) { - parsedColumn.emplace_back(CreateHolderIfNeeded( - values, size, item.value().get_string().value() - )); + const auto& column = Columns[it->second]; + + std::string_view value; + if (item.value().is_null()) { + // TODO: support optional types and create UV + continue; + } else if (column.Type == "Json") { + value = item.value().raw_json().value(); + } else if (column.Type == "String" || column.Type == "Utf8") { + value = item.value().get_string().value(); + } else if (item.value().is_scalar()) { + // TODO: perform type validation and create UV + value = item.value().raw_json_token().value(); } else { - parsedColumn.emplace_back(CreateHolderIfNeeded( - values, size, item.value().raw_json_token().value() - )); + throw yexception() << "Failed to parse json string, expected scalar type for column '" << it->first << "' with type " << column.Type << " but got nested json, please change column type to Json."; } + + auto& parsedColumn = ParsedValues[it->second]; + parsedColumn.resize(rowId); + parsedColumn.emplace_back(CreateHolderIfNeeded(values, size, value)); } + rowId++; + } + Y_ENSURE(rowId == Buffer.GetNumberValues(), "Unexpected number of json documents"); + + for (auto& parsedColumn : ParsedValues) { + parsedColumn.resize(Buffer.GetNumberValues()); } return ParsedValues; } @@ -119,7 +144,7 @@ class TJsonParser::TImpl { TString GetDescription() const { TStringBuilder description = TStringBuilder() << "Columns: "; for (const auto& column : Columns) { - description << "'" << column << "' "; + description << "'" << column.Name << "':" << column.Type << " "; } description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished(); return description; @@ -128,7 +153,7 @@ class TJsonParser::TImpl { TString GetDebugString(const TVector>& parsedValues) const { TStringBuilder result; for (size_t i = 0; i < Columns.size(); ++i) { - result << "Parsed column '" << Columns[i] << "': "; + result << "Parsed column '" << Columns[i].Name << "': "; for (const auto& value : parsedValues[i]) { result << "'" << value << "' "; } @@ -146,8 +171,18 @@ class TJsonParser::TImpl { return Buffer.AddHolder(value); } + static TString SkipOptional(const TString& type) { + if (type.StartsWith("Optional")) { + TStringBuf optionalType = type; + Y_ENSURE(optionalType.SkipPrefix("Optional<"), "Unexpected type"); + Y_ENSURE(optionalType.ChopSuffix(">"), "Unexpected type"); + return TString(optionalType); + } + return type; + } + private: - TVector Columns; + TVector Columns; absl::flat_hash_map ColumnsIndex; TJsonParserBuffer Buffer; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 54fde6580784..7a71a19af4ba 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -118,7 +118,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { MakeParser({"a1", "a2"}); TJsonParserBuffer& buffer = Parser->GetBuffer(); - buffer.AddValue(R"({"a1": "hello1", "a2": 101, "event": "event1"})"); + buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})"); buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event2"})"); buffer.AddValue(R"({"a2": "101", "a1": "hello1", "event": "event3"})"); @@ -133,6 +133,57 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { } } + Y_UNIT_TEST_F(MissingFields, TFixture) { + MakeParser({"a1", "a2"}); + + TJsonParserBuffer& buffer = Parser->GetBuffer(); + buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})"); + buffer.AddValue(R"({"a1": "hello1", "event": "event2"})"); + buffer.AddValue(R"({"a2": "101", "a1": null, "event": "event3"})"); + + ParsedValues = Parser->Parse(); + ResultNumberValues = ParsedValues.front().size(); + UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); + for (size_t i = 0; i < ResultNumberValues; ++i) { + const auto& result = GetParsedRow(i); + UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i); + UNIT_ASSERT_VALUES_EQUAL_C(i != 2 ? "hello1" : "", result.front(), i); + UNIT_ASSERT_VALUES_EQUAL_C(i != 1 ? "101" : "", result.back(), i); + } + } + + Y_UNIT_TEST_F(NestedTypes, TFixture) { + MakeParser({"nested", "a1"}, {"Optional", "String"}); + + TJsonParserBuffer& buffer = Parser->GetBuffer(); + buffer.AddValue(R"({"a1": "hello1", "nested": {"key": "value"}})"); + buffer.AddValue(R"({"a1": "hello1", "nested": ["key1", "key2"]})"); + + ParsedValues = Parser->Parse(); + ResultNumberValues = ParsedValues.front().size(); + UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); + + const auto& nestedJson = GetParsedRow(0); + UNIT_ASSERT_VALUES_EQUAL(2, nestedJson.size()); + UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", nestedJson.front()); + UNIT_ASSERT_VALUES_EQUAL("hello1", nestedJson.back()); + + const auto& nestedList = GetParsedRow(1); + UNIT_ASSERT_VALUES_EQUAL(2, nestedList.size()); + UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", nestedList.front()); + UNIT_ASSERT_VALUES_EQUAL("hello1", nestedList.back()); + } + + Y_UNIT_TEST_F(StringTypeValidation, TFixture) { + MakeParser({"a1"}, {"String"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": 1234})"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); + } + + Y_UNIT_TEST_F(JsonTypeValidation, TFixture) { + MakeParser({"a1"}, {"Int32"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": {"key": "value"}})"), yexception, "Failed to parse json string, expected scalar type for column 'a1' with type Int32 but got nested json, please change column type to Json."); + } + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeParser({"a2", "a1"}); UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 3b1886d457fa..083d6fa5aa28 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -218,6 +218,37 @@ def test_scheme_error(self, kikimr, client): stop_yds_query(client, query_id) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0) + @yq_v1 + def test_nested_types(self, kikimr, client): + client.create_yds_connection( + YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True + ) + self.init_topics("test_nested_types") + + sql = Rf''' + INSERT INTO {YDS_CONNECTION}.`{self.output_topic}` + SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}` + WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data Json NOT NULL, event String NOT NULL)) + WHERE event = "event1" or event = "event2";''' + + query_id = start_yds_query(kikimr, client, sql) + wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + + data = [ + '{"time": 101, "data": {"key": "value"}, "event": "event1"}', + '{"time": 102, "data": ["key1", "key2"], "event": "event2"}', + ] + + self.write_stream(data) + expected = [ + '{"key": "value"}', + '["key1", "key2"]' + ] + assert self.read_stream(len(expected), topic_path=self.output_topic) == expected + + wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1) + stop_yds_query(client, query_id) + @yq_v1 def test_filter(self, kikimr, client): client.create_yds_connection(