Skip to content

Commit

Permalink
YQ RD optimized purecalc memory usage (ydb-platform#11394)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Nov 8, 2024
1 parent 202eb87 commit c38def0
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 19 deletions.
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>

#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace NFq::NRowDispatcher {


Expand All @@ -17,6 +19,7 @@ struct TActorFactory : public IActorFactory {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const override {

Expand All @@ -29,6 +32,7 @@ struct TActorFactory : public IActorFactory {
partitionId,
std::move(driver),
credentialsProviderFactory,
pureCalcProgramFactory,
counters,
pqGateway
);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq::NRowDispatcher {

Expand All @@ -20,6 +21,7 @@ struct IActorFactory : public TThrRefBase {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) const = 0;
};
Expand Down
16 changes: 9 additions & 7 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,15 @@ class TJsonFilter::TImpl {
TImpl(const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Sql(GenerateSql(whereFilter)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = factory->MakePushStreamProgram(
Program = pureCalcProgramFactory->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
Expand Down Expand Up @@ -311,8 +311,9 @@ TJsonFilter::TJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback)) {
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory)
: Impl(std::make_unique<TJsonFilter::TImpl>(columns, types, whereFilter, callback, pureCalcProgramFactory)) {
}

TJsonFilter::~TJsonFilter() {
Expand All @@ -330,8 +331,9 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback));
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory) {
return std::unique_ptr<TJsonFilter>(new TJsonFilter(columns, types, whereFilter, callback, pureCalcProgramFactory));
}

} // namespace NFq
7 changes: 5 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

namespace NFq {

Expand All @@ -13,7 +14,8 @@ class TJsonFilter {
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback);
TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);

~TJsonFilter();

Expand All @@ -29,6 +31,7 @@ std::unique_ptr<TJsonFilter> NewJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TJsonFilter::TCallback callback);
TJsonFilter::TCallback callback,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory);

} // namespace NFq
4 changes: 4 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/library/actors/core/interconnect.h>
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/events/events.h>
Expand Down Expand Up @@ -119,6 +120,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {

NConfig::TRowDispatcherConfig Config;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
TYqSharedResources::TPtr YqSharedResources;
TMaybe<TActorId> CoordinatorActorId;
TSet<TActorId> CoordinatorChangedSubscribers;
Expand Down Expand Up @@ -264,6 +266,7 @@ TRowDispatcher::TRowDispatcher(
const NYql::IPqGateway::TPtr& pqGateway)
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, YqSharedResources(yqSharedResources)
, CredentialsFactory(credentialsFactory)
, LogPrefix("RowDispatcher: ")
Expand Down Expand Up @@ -436,6 +439,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
CredentialsFactory,
ev->Get()->Record.GetToken(),
source.GetAddBearerToToken()),
PureCalcProgramFactory,
Counters,
PqGateway
);
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 PartitionId;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
NYql::ITopicClient::TPtr TopicClient;
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
const i64 BufferSize;
Expand Down Expand Up @@ -179,6 +180,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down Expand Up @@ -268,6 +270,7 @@ TTopicSession::TTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway)
: TopicPath(topicPath)
Expand All @@ -277,6 +280,7 @@ TTopicSession::TTopicSession(
, PartitionId(partitionId)
, Driver(std::move(driver))
, CredentialsProviderFactory(credentialsProviderFactory)
, PureCalcProgramFactory(pureCalcProgramFactory)
, BufferSize(16_MB)
, LogPrefix("TopicSession")
, Config(config)
Expand Down Expand Up @@ -734,7 +738,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
predicate,
[&, actorId = clientInfo.ReadActorId](ui64 offset, const TString& json){
Send(SelfId(), new NFq::TEvPrivate::TEvDataAfterFilteration(offset, json, actorId));
});
},
PureCalcProgramFactory);
} else {
ClientsWithoutPredicate.insert(ev->Sender);
}
Expand Down Expand Up @@ -959,9 +964,10 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway) {
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway));
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, partitionId, std::move(driver), credentialsProviderFactory, pureCalcProgramFactory, counters, pqGateway));
}

} // namespace NFq
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/topic_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/fwd.h>

#include <ydb/library/actors/core/actor.h>

Expand All @@ -24,6 +25,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
NYql::NPureCalc::IProgramFactoryPtr pureCalcProgramFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway);

Expand Down
21 changes: 16 additions & 5 deletions ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/testlib/actor_helpers.h>

#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <library/cpp/testing/unittest/registar.h>

Expand All @@ -22,16 +23,24 @@ class TFixture : public NUnitTest::TBaseFixture {

public:
TFixture()
: Runtime(true)
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, Runtime(true)
, Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
{}

static void SegmentationFaultHandler(int) {
Cerr << "segmentation fault call stack:" << Endl;
FormatBackTrace(&Cerr);
abort();
}

void SetUp(NUnitTest::TTestContext&) override {
NKikimr::EnableYDBBacktraceFormat();
signal(SIGSEGV, &SegmentationFaultHandler);

TAutoPtr<TAppPrepare> app = new TAppPrepare();
Runtime.Initialize(app->Unwrap());
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG);

NKikimr::EnableYDBBacktraceFormat();
}

void TearDown(NUnitTest::TTestContext& /* context */) override {
Expand All @@ -55,7 +64,8 @@ class TFixture : public NUnitTest::TBaseFixture {
columns,
types,
whereFilter,
callback);
callback,
PureCalcProgramFactory);
}

const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function<NYql::NUdf::TUnboxedValuePod(size_t)> valueCreator) {
Expand Down Expand Up @@ -90,8 +100,9 @@ class TFixture : public NUnitTest::TBaseFixture {
});
}

TActorSystemStub actorSystemStub;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
std::unique_ptr<NFq::TJsonFilter> Filter;

NKikimr::NMiniKQL::TScopedAlloc Alloc;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
ui32 /*partitionId*/,
NYdb::TDriver /*driver*/,
std::shared_ptr<NYdb::ICredentialsProviderFactory> /*credentialsProviderFactory*/,
NYql::NPureCalc::IProgramFactoryPtr /*pureCalcProgramFactory*/,
const ::NMonitoring::TDynamicCounterPtr& /*counters*/,
const NYql::IPqGateway::TPtr& /*pqGateway*/) const override {
auto actorId = Runtime.AllocateEdgeActor();
Expand Down
10 changes: 7 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/tests/fq/pq_async_io/ut_helpers.h>

#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

namespace {

Expand All @@ -24,10 +25,11 @@ const ui64 TimeoutBeforeStartSessionSec = 3;
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;

class TFixture : public NUnitTest::TBaseFixture {

public:
TFixture()
: Runtime(true) {}
: PureCalcProgramFactory(NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()))
, Runtime(true)
{}

void SetUp(NUnitTest::TTestContext&) override {
TAutoPtr<TAppPrepare> app = new TAppPrepare();
Expand Down Expand Up @@ -68,6 +70,7 @@ class TFixture : public NUnitTest::TBaseFixture {
0,
Driver,
CredentialsProviderFactory,
PureCalcProgramFactory,
MakeIntrusive<NMonitoring::TDynamicCounters>(),
CreatePqNativeGateway(pqServices)
).release());
Expand Down Expand Up @@ -155,8 +158,9 @@ class TFixture : public NUnitTest::TBaseFixture {
return eventHolder->Get()->Record.MessagesSize();
}

TActorSystemStub actorSystemStub;
NYql::NPureCalc::IProgramFactoryPtr PureCalcProgramFactory;
NActors::TTestActorRuntime Runtime;
TActorSystemStub ActorSystemStub;
NActors::TActorId TopicSession;
NActors::TActorId RowDispatcherActorId;
NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")));
Expand Down

0 comments on commit c38def0

Please sign in to comment.