diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp index 28c6ee11ddbd..f78e5f691a84 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp @@ -2,6 +2,8 @@ #include +#include + namespace NFq::NRowDispatcher { @@ -17,6 +19,7 @@ struct TActorFactory : public IActorFactory { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const override { @@ -29,6 +32,7 @@ struct TActorFactory : public IActorFactory { partitionId, std::move(driver), credentialsProviderFactory, + pureCalcProgramFactory, counters, pqGateway ); diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index 4363a3b646f0..ce3d8be007c5 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace NFq::NRowDispatcher { @@ -20,6 +21,7 @@ struct IActorFactory : public TThrRefBase { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) const = 0; }; diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index fbc89e816b47..202369744f1d 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -263,15 +263,15 @@ class TJsonFilter::TImpl { TImpl(const TVector& columns, const TVector& types, const TString& whereFilter, - TCallback callback) + TCallback callback, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) : Sql(GenerateSql(whereFilter)) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); - auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()); // Program should be stateless because input values // allocated on another allocator and should be released LOG_ROW_DISPATCHER_DEBUG("Creating program..."); - Program = factory->MakePushStreamProgram( + Program = pureCalcProgramFactory->MakePushStreamProgram( TFilterInputSpec(MakeInputSchema(columns, types)), TFilterOutputSpec(MakeOutputSchema()), Sql, @@ -311,8 +311,9 @@ TJsonFilter::TJsonFilter( const TVector& columns, const TVector& types, const TString& whereFilter, - TCallback callback) - : Impl(std::make_unique(columns, types, whereFilter, callback)) { + TCallback callback, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) + : Impl(std::make_unique(columns, types, whereFilter, callback, pureCalcProgramFactory)) { } TJsonFilter::~TJsonFilter() { @@ -330,8 +331,9 @@ std::unique_ptr NewJsonFilter( const TVector& columns, const TVector& types, const TString& whereFilter, - TCallback callback) { - return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback)); + TCallback callback, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) { + return std::unique_ptr(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index c4435bd9bab7..09401c6a9b86 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace NFq { @@ -13,7 +14,8 @@ class TJsonFilter { const TVector& columns, const TVector& types, const TString& whereFilter, - TCallback callback); + TCallback callback, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); ~TJsonFilter(); @@ -29,6 +31,7 @@ std::unique_ptr NewJsonFilter( const TVector& columns, const TVector& types, const TString& whereFilter, - TJsonFilter::TCallback callback); + TJsonFilter::TCallback callback, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index a695ee23c7fd..d3e3b4332337 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -119,6 +120,7 @@ class TRowDispatcher : public TActorBootstrapped { NConfig::TRowDispatcherConfig Config; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; + NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; TYqSharedResources::TPtr YqSharedResources; TMaybe CoordinatorActorId; TSet CoordinatorChangedSubscribers; @@ -264,6 +266,7 @@ TRowDispatcher::TRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) + , PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) , YqSharedResources(yqSharedResources) , CredentialsFactory(credentialsFactory) , LogPrefix("RowDispatcher: ") @@ -436,6 +439,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { CredentialsFactory, ev->Get()->Record.GetToken(), source.GetAddBearerToToken()), + PureCalcProgramFactory, Counters, PqGateway ); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index a2b5e2dbb2a2..73651c427592 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -149,6 +149,7 @@ class TTopicSession : public TActorBootstrapped { ui32 PartitionId; NYdb::TDriver Driver; std::shared_ptr CredentialsProviderFactory; + NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; NYql::ITopicClient::TPtr TopicClient; std::shared_ptr ReadSession; const i64 BufferSize; @@ -179,6 +180,7 @@ class TTopicSession : public TActorBootstrapped { ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); @@ -268,6 +270,7 @@ TTopicSession::TTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) : TopicPath(topicPath) @@ -277,6 +280,7 @@ TTopicSession::TTopicSession( , PartitionId(partitionId) , Driver(std::move(driver)) , CredentialsProviderFactory(credentialsProviderFactory) + , PureCalcProgramFactory(pureCalcProgramFactory) , BufferSize(16_MB) , LogPrefix("TopicSession") , Config(config) @@ -734,7 +738,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { predicate, [&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){ Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId)); - }); + }, + PureCalcProgramFactory); } else { ClientsWithoutPredicate.insert(ev->Sender); } @@ -959,9 +964,10 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway) { - return std::unique_ptr(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway)); + return std::unique_ptr(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index 24a00be2c367..54f8b3510b11 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -8,6 +8,7 @@ #include #include +#include #include @@ -24,6 +25,7 @@ std::unique_ptr NewTopicSession( ui32 partitionId, NYdb::TDriver driver, std::shared_ptr credentialsProviderFactory, + NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::IPqGateway::TPtr& pqGateway); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index fecc26e4a73c..d17ec36cf673 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -10,6 +10,7 @@ #include #include +#include #include @@ -22,16 +23,24 @@ class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : Runtime(true) + : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + , Runtime(true) , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) {} + static void SegmentationFaultHandler(int) { + Cerr << "segmentation fault call stack:" << Endl; + FormatBackTrace(&Cerr); + abort(); + } + void SetUp(NUnitTest::TTestContext&) override { + NKikimr::EnableYDBBacktraceFormat(); + signal(SIGSEGV, &SegmentationFaultHandler); + TAutoPtr app = new TAppPrepare(); Runtime.Initialize(app->Unwrap()); Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG); - - NKikimr::EnableYDBBacktraceFormat(); } void TearDown(NUnitTest::TTestContext& /* context */) override { @@ -55,7 +64,8 @@ class TFixture : public NUnitTest::TBaseFixture { columns, types, whereFilter, - callback); + callback, + PureCalcProgramFactory); } const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function valueCreator) { @@ -90,8 +100,9 @@ class TFixture : public NUnitTest::TBaseFixture { }); } - TActorSystemStub actorSystemStub; + NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; + TActorSystemStub ActorSystemStub; std::unique_ptr Filter; NKikimr::NMiniKQL::TScopedAlloc Alloc; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index 8186ee2e4384..bafd824fb48b 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -35,6 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory { ui32 /*partitionId*/, NYdb::TDriver /*driver*/, std::shared_ptr /*credentialsProviderFactory*/, + NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/, const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const NYql::IPqGateway::TPtr& /*pqGateway*/) const override { auto actorId = Runtime.AllocateEdgeActor(); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 86afe8934a68..cd1e60932716 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -13,6 +13,7 @@ #include #include +#include namespace { @@ -24,10 +25,11 @@ const ui64 TimeoutBeforeStartSessionSec = 3; const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec; class TFixture : public NUnitTest::TBaseFixture { - public: TFixture() - : Runtime(true) {} + : PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions())) + , Runtime(true) + {} void SetUp(NUnitTest::TTestContext&) override { TAutoPtr app = new TAppPrepare(); @@ -68,6 +70,7 @@ class TFixture : public NUnitTest::TBaseFixture { 0, Driver, CredentialsProviderFactory, + PureCalcProgramFactory, MakeIntrusive(), CreatePqNativeGateway(pqServices) ).release()); @@ -155,8 +158,9 @@ class TFixture : public NUnitTest::TBaseFixture { return eventHolder->Get()->Record.MessagesSize(); } - TActorSystemStub actorSystemStub; + NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory; NActors::TTestActorRuntime Runtime; + TActorSystemStub ActorSystemStub; NActors::TActorId TopicSession; NActors::TActorId RowDispatcherActorId; NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")));