Skip to content

Commit

Permalink
YQ-3722 RD fixed nested type parsing (ydb-platform#10300)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Oct 11, 2024
1 parent 4da79fb commit 6595ec0
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 18 deletions.
8 changes: 7 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,13 @@ class TJsonFilter::TImpl {
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
str << OffsetFieldName << ", ";
for (size_t i = 0; i < columnNames.size(); ++i) {
str << "CAST(" << columnNames[i] << " as " << columnTypes[i] << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
TString columnType = columnTypes[i];
if (columnType == "Json") {
columnType = "String";
} else if (columnType == "Optional<Json>") {
columnType = "Optional<String>";
}
str << "CAST(" << columnNames[i] << " as " << columnType << ") as " << columnNames[i] << ((i != columnNames.size() - 1) ? "," : "");
}
str << " FROM Input;\n";
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
Expand Down
67 changes: 51 additions & 16 deletions ydb/core/fq/libs/row_dispatcher/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,28 @@ void TJsonParserBuffer::Clear() {
//// TJsonParser

class TJsonParser::TImpl {
struct TColumnDescription {
std::string Name;
TString Type;
};

public:
TImpl(const TVector<TString>& columns, const TVector<TString>& types)
: ParsedValues(columns.size())
{
Y_UNUSED(types); // TODO: Will be used for UV creation
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");

Columns.reserve(columns.size());
for (const auto& column : columns) {
Columns.emplace_back(column);
for (size_t i = 0; i < columns.size(); i++) {
Columns.emplace_back(TColumnDescription{
.Name = columns[i],
.Type = SkipOptional(types[i])
});
}

ColumnsIndex.reserve(columns.size());
for (size_t i = 0; i < Columns.size(); i++) {
ColumnsIndex.emplace(std::string_view(Columns[i]), i);
for (size_t i = 0; i < columns.size(); i++) {
ColumnsIndex.emplace(std::string_view(Columns[i].Name), i);
}
}

Expand All @@ -86,6 +94,7 @@ class TJsonParser::TImpl {
simdjson::ondemand::parser parser;
parser.threaded = false;

size_t rowId = 0;
simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE);
for (auto document : documents) {
for (auto item : document.get_object()) {
Expand All @@ -94,17 +103,33 @@ class TJsonParser::TImpl {
continue;
}

auto& parsedColumn = ParsedValues[it->second];
if (item.value().is_string()) {
parsedColumn.emplace_back(CreateHolderIfNeeded(
values, size, item.value().get_string().value()
));
const auto& column = Columns[it->second];

std::string_view value;
if (item.value().is_null()) {
// TODO: support optional types and create UV
continue;
} else if (column.Type == "Json") {
value = item.value().raw_json().value();
} else if (column.Type == "String" || column.Type == "Utf8") {
value = item.value().get_string().value();
} else if (item.value().is_scalar()) {
// TODO: perform type validation and create UV
value = item.value().raw_json_token().value();
} else {
parsedColumn.emplace_back(CreateHolderIfNeeded(
values, size, item.value().raw_json_token().value()
));
throw yexception() << "Failed to parse json string, expected scalar type for column '" << it->first << "' with type " << column.Type << " but got nested json, please change column type to Json.";
}

auto& parsedColumn = ParsedValues[it->second];
parsedColumn.resize(rowId);
parsedColumn.emplace_back(CreateHolderIfNeeded(values, size, value));
}
rowId++;
}
Y_ENSURE(rowId == Buffer.GetNumberValues(), "Unexpected number of json documents");

for (auto& parsedColumn : ParsedValues) {
parsedColumn.resize(Buffer.GetNumberValues());
}
return ParsedValues;
}
Expand All @@ -119,7 +144,7 @@ class TJsonParser::TImpl {
TString GetDescription() const {
TStringBuilder description = TStringBuilder() << "Columns: ";
for (const auto& column : Columns) {
description << "'" << column << "' ";
description << "'" << column.Name << "':" << column.Type << " ";
}
description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished();
return description;
Expand All @@ -128,7 +153,7 @@ class TJsonParser::TImpl {
TString GetDebugString(const TVector<TVector<std::string_view>>& parsedValues) const {
TStringBuilder result;
for (size_t i = 0; i < Columns.size(); ++i) {
result << "Parsed column '" << Columns[i] << "': ";
result << "Parsed column '" << Columns[i].Name << "': ";
for (const auto& value : parsedValues[i]) {
result << "'" << value << "' ";
}
Expand All @@ -146,8 +171,18 @@ class TJsonParser::TImpl {
return Buffer.AddHolder(value);
}

static TString SkipOptional(const TString& type) {
if (type.StartsWith("Optional")) {
TStringBuf optionalType = type;
Y_ENSURE(optionalType.SkipPrefix("Optional<"), "Unexpected type");
Y_ENSURE(optionalType.ChopSuffix(">"), "Unexpected type");
return TString(optionalType);
}
return type;
}

private:
TVector<std::string> Columns;
TVector<TColumnDescription> Columns;
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;

TJsonParserBuffer Buffer;
Expand Down
53 changes: 52 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
MakeParser({"a1", "a2"});

TJsonParserBuffer& buffer = Parser->GetBuffer();
buffer.AddValue(R"({"a1": "hello1", "a2": 101, "event": "event1"})");
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})");
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event2"})");
buffer.AddValue(R"({"a2": "101", "a1": "hello1", "event": "event3"})");

Expand All @@ -133,6 +133,57 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
}
}

Y_UNIT_TEST_F(MissingFields, TFixture) {
MakeParser({"a1", "a2"});

TJsonParserBuffer& buffer = Parser->GetBuffer();
buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event1"})");
buffer.AddValue(R"({"a1": "hello1", "event": "event2"})");
buffer.AddValue(R"({"a2": "101", "a1": null, "event": "event3"})");

ParsedValues = Parser->Parse();
ResultNumberValues = ParsedValues.front().size();
UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues);
for (size_t i = 0; i < ResultNumberValues; ++i) {
const auto& result = GetParsedRow(i);
UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i);
UNIT_ASSERT_VALUES_EQUAL_C(i != 2 ? "hello1" : "", result.front(), i);
UNIT_ASSERT_VALUES_EQUAL_C(i != 1 ? "101" : "", result.back(), i);
}
}

Y_UNIT_TEST_F(NestedTypes, TFixture) {
MakeParser({"nested", "a1"}, {"Optional<Json>", "String"});

TJsonParserBuffer& buffer = Parser->GetBuffer();
buffer.AddValue(R"({"a1": "hello1", "nested": {"key": "value"}})");
buffer.AddValue(R"({"a1": "hello1", "nested": ["key1", "key2"]})");

ParsedValues = Parser->Parse();
ResultNumberValues = ParsedValues.front().size();
UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues);

const auto& nestedJson = GetParsedRow(0);
UNIT_ASSERT_VALUES_EQUAL(2, nestedJson.size());
UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", nestedJson.front());
UNIT_ASSERT_VALUES_EQUAL("hello1", nestedJson.back());

const auto& nestedList = GetParsedRow(1);
UNIT_ASSERT_VALUES_EQUAL(2, nestedList.size());
UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", nestedList.front());
UNIT_ASSERT_VALUES_EQUAL("hello1", nestedList.back());
}

Y_UNIT_TEST_F(StringTypeValidation, TFixture) {
MakeParser({"a1"}, {"String"});
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": 1234})"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type.");
}

Y_UNIT_TEST_F(JsonTypeValidation, TFixture) {
MakeParser({"a1"}, {"Int32"});
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"({"a1": {"key": "value"}})"), yexception, "Failed to parse json string, expected scalar type for column 'a1' with type Int32 but got nested json, please change column type to Json.");
}

Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) {
MakeParser({"a2", "a1"});
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type.");
Expand Down
31 changes: 31 additions & 0 deletions ydb/tests/fq/yds/test_row_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,37 @@ def test_scheme_error(self, kikimr, client):
stop_yds_query(client, query_id)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)

@yq_v1
def test_nested_types(self, kikimr, client):
client.create_yds_connection(
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
)
self.init_topics("test_nested_types")

sql = Rf'''
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data Json NOT NULL, event String NOT NULL))
WHERE event = "event1" or event = "event2";'''

query_id = start_yds_query(kikimr, client, sql)
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)

data = [
'{"time": 101, "data": {"key": "value"}, "event": "event1"}',
'{"time": 102, "data": ["key1", "key2"], "event": "event2"}',
]

self.write_stream(data)
expected = [
'{"key": "value"}',
'["key1", "key2"]'
]
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected

wait_actor_count(kikimr, "DQ_PQ_READ_ACTOR", 1)
stop_yds_query(client, query_id)

@yq_v1
def test_filter(self, kikimr, client):
client.create_yds_connection(
Expand Down

0 comments on commit 6595ec0

Please sign in to comment.