Skip to content

Commit

Permalink
YQ-3892 fix data race in json filters (ydb-platform#11827)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 21, 2024
1 parent 72f573d commit 913245a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 1 deletion.
5 changes: 5 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class TPureCalcProgramFactory : public IPureCalcProgramFactory {
return it->second;
}

TGuard<TMutex> LockFactory() const override {
return Guard(FactoryMutex);
}

private:
void CreateFactory(const TSettings& settings) {
ProgramFactories.insert({settings, NYql::NPureCalc::MakeProgramFactory(
Expand All @@ -31,6 +35,7 @@ class TPureCalcProgramFactory : public IPureCalcProgramFactory {

private:
std::map<TSettings, NYql::NPureCalc::IProgramFactoryPtr> ProgramFactories;
TMutex FactoryMutex;
};

} // anonymous namespace
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/common.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <util/generic/ptr.h>
#include <util/system/mutex.h>

#include <ydb/library/yql/public/purecalc/common/fwd.h>

Expand All @@ -18,6 +19,9 @@ class IPureCalcProgramFactory : public TThrRefBase {

public:
virtual NYql::NPureCalc::IProgramFactoryPtr GetFactory(const TSettings& settings) const = 0;

// Before creating purecalc program factory should be locked
virtual TGuard<TMutex> LockFactory() const = 0;
};

IPureCalcProgramFactory::TPtr CreatePureCalcProgramFactory();
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,13 @@ class TJsonFilter::TImpl {
TCallback callback,
IPureCalcProgramFactory::TPtr pureCalcProgramFactory,
const IPureCalcProgramFactory::TSettings& factorySettings)
: Sql(GenerateSql(whereFilter, factorySettings)) {
: PureCalcProgramFactory(pureCalcProgramFactory)
, Sql(GenerateSql(whereFilter, factorySettings)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");

// Shared factory may change during compilation, so it should be locked
auto guard = pureCalcProgramFactory->LockFactory();

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Expand All @@ -304,6 +308,12 @@ class TJsonFilter::TImpl {
return Sql;
}

~TImpl() {
auto guard = PureCalcProgramFactory->LockFactory();
InputConsumer.Reset();
Program.Reset();
}

private:
TString GenerateSql(const TString& whereFilter, const IPureCalcProgramFactory::TSettings& factorySettings) {
TStringStream str;
Expand All @@ -317,6 +327,7 @@ class TJsonFilter::TImpl {
}

private:
const IPureCalcProgramFactory::TPtr PureCalcProgramFactory;
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<TInputType>> InputConsumer;
const TString Sql;
Expand Down

0 comments on commit 913245a

Please sign in to comment.