From 1560482072bf4e226b8619e9d51a102f872c8bf5 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 23 Oct 2024 16:35:39 +0000 Subject: [PATCH] Added ref locking --- .../fq/libs/row_dispatcher/json_parser.cpp | 23 +++++++++++++------ .../libs/row_dispatcher/ut/json_filter_ut.cpp | 6 +++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 69a592f7fe19..a0e79e7b02b0 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -153,10 +153,7 @@ class TJsonParser::TImpl { LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); with_lock (Alloc) { - for (auto& parsedColumn : ParsedValues) { - parsedColumn.clear(); - parsedColumn.reserve(Buffer.NumberValues); - } + ClearColumns(Buffer.NumberValues); size_t rowId = 0; simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE); @@ -173,6 +170,7 @@ class TJsonParser::TImpl { try { parsedColumn.emplace_back(ParseJsonValue(columnDesc.Type, item.value())); + Alloc.Ref().LockObject(parsedColumn.back()); } catch (...) { throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson << ", description: " << CurrentExceptionMessage(); } @@ -202,9 +200,20 @@ class TJsonParser::TImpl { ~TImpl() { Alloc.Acquire(); + ClearColumns(0); } private: + void ClearColumns(size_t reserveSize) { + for (auto& parsedColumn : ParsedValues) { + for (const auto& value : parsedColumn) { + Alloc.Ref().UnlockObject(value); + } + parsedColumn.clear(); + parsedColumn.reserve(reserveSize); + } + } + void ResizeColumn(const TColumnDescription& columnDesc, NKikimr::NMiniKQL::TUnboxedValueVector& parsedColumn, size_t size) const { if (columnDesc.Type->IsOptional()) { parsedColumn.resize(size); @@ -213,7 +222,7 @@ class TJsonParser::TImpl { } } - NYql::NUdf::TUnboxedValue ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const { + NYql::NUdf::TUnboxedValuePod ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const { switch (type->GetKind()) { case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type); @@ -236,7 +245,7 @@ class TJsonParser::TImpl { } } - NYql::NUdf::TUnboxedValue ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const { + NYql::NUdf::TUnboxedValuePod ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const { const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot); switch (jsonValue.type()) { case simdjson::fallback::ondemand::json_type::number: { @@ -307,7 +316,7 @@ class TJsonParser::TImpl { } template - static NYql::NUdf::TUnboxedValue ParseJsonNumber(TJsonNumber number) { + static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) { if (number < std::numeric_limits::min() || std::numeric_limits::max() < number) { throw yexception() << "number is out of range"; } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 83d20a37ecf1..fecc26e4a73c 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -36,6 +36,11 @@ class TFixture : public NUnitTest::TBaseFixture { void TearDown(NUnitTest::TTestContext& /* context */) override { with_lock (Alloc) { + for (const auto& holder : Holders) { + for (const auto& value : holder) { + Alloc.Ref().UnlockObject(value); + } + } Holders.clear(); } Filter.reset(); @@ -58,6 +63,7 @@ class TFixture : public NUnitTest::TBaseFixture { Holders.emplace_front(); for (size_t i = 0; i < size; ++i) { Holders.front().emplace_back(valueCreator(i)); + Alloc.Ref().LockObject(Holders.front().back()); } return &Holders.front(); }