Skip to content

Commit

Permalink
Added ref locking
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Oct 23, 2024
1 parent c4bb730 commit 1560482
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
23 changes: 16 additions & 7 deletions ydb/core/fq/libs/row_dispatcher/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,7 @@ class TJsonParser::TImpl {
LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values);

with_lock (Alloc) {
for (auto& parsedColumn : ParsedValues) {
parsedColumn.clear();
parsedColumn.reserve(Buffer.NumberValues);
}
ClearColumns(Buffer.NumberValues);

size_t rowId = 0;
simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE);
Expand All @@ -173,6 +170,7 @@ class TJsonParser::TImpl {

try {
parsedColumn.emplace_back(ParseJsonValue(columnDesc.Type, item.value()));
Alloc.Ref().LockObject(parsedColumn.back());
} catch (...) {
throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson << ", description: " << CurrentExceptionMessage();
}
Expand Down Expand Up @@ -202,9 +200,20 @@ class TJsonParser::TImpl {

~TImpl() {
Alloc.Acquire();
ClearColumns(0);
}

private:
void ClearColumns(size_t reserveSize) {
for (auto& parsedColumn : ParsedValues) {
for (const auto& value : parsedColumn) {
Alloc.Ref().UnlockObject(value);
}
parsedColumn.clear();
parsedColumn.reserve(reserveSize);
}
}

void ResizeColumn(const TColumnDescription& columnDesc, NKikimr::NMiniKQL::TUnboxedValueVector& parsedColumn, size_t size) const {
if (columnDesc.Type->IsOptional()) {
parsedColumn.resize(size);
Expand All @@ -213,7 +222,7 @@ class TJsonParser::TImpl {
}
}

NYql::NUdf::TUnboxedValue ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const {
NYql::NUdf::TUnboxedValuePod ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const {
switch (type->GetKind()) {
case NKikimr::NMiniKQL::TTypeBase::EKind::Data: {
const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type);
Expand All @@ -236,7 +245,7 @@ class TJsonParser::TImpl {
}
}

NYql::NUdf::TUnboxedValue ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const {
NYql::NUdf::TUnboxedValuePod ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const {
const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot);
switch (jsonValue.type()) {
case simdjson::fallback::ondemand::json_type::number: {
Expand Down Expand Up @@ -307,7 +316,7 @@ class TJsonParser::TImpl {
}

template <typename TResult, typename TJsonNumber>
static NYql::NUdf::TUnboxedValue ParseJsonNumber(TJsonNumber number) {
static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) {
if (number < std::numeric_limits<TResult>::min() || std::numeric_limits<TResult>::max() < number) {
throw yexception() << "number is out of range";
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class TFixture : public NUnitTest::TBaseFixture {

void TearDown(NUnitTest::TTestContext& /* context */) override {
with_lock (Alloc) {
for (const auto& holder : Holders) {
for (const auto& value : holder) {
Alloc.Ref().UnlockObject(value);
}
}
Holders.clear();
}
Filter.reset();
Expand All @@ -58,6 +63,7 @@ class TFixture : public NUnitTest::TBaseFixture {
Holders.emplace_front();
for (size_t i = 0; i < size; ++i) {
Holders.front().emplace_back(valueCreator(i));
Alloc.Ref().LockObject(Holders.front().back());
}
return &Holders.front();
}
Expand Down

0 comments on commit 1560482

Please sign in to comment.