Skip to content

Commit

Permalink
YQ-3970 RD added parser mkql counters (ydb-platform#12607)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 13, 2024
1 parent fc43113 commit 62228f6
Show file tree
Hide file tree
Showing 24 changed files with 89 additions and 39 deletions.
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ void Init(
tenant,
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
CreatePqNativeGateway(pqServices),
appData->Mon);
appData->Mon,
appData->Counters);
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ struct TActorFactory : public IActorFactory {
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize) const override {

Expand All @@ -35,6 +36,7 @@ struct TActorFactory : public IActorFactory {
std::move(driver),
credentialsProviderFactory,
counters,
countersRoot,
pqGateway,
maxBufferSize
);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ struct IActorFactory : public TThrRefBase {
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
ui64 maxBufferSize) const = 0;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,12 @@ TString TSchemaColumn::ToString() const {
return TStringBuilder() << "'" << Name << "' : " << TypeYson;
}

//// TCountersDesc

TCountersDesc TCountersDesc::CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const {
TCountersDesc result(*this);
result.MkqlCountersName = mkqlCountersName;
return result;
}

} // namespace NFq::NRowDispatcher
10 changes: 10 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <library/cpp/monlib/dynamic_counters/counters.h>

#include <ydb/library/conclusion/generic/result.h>
#include <ydb/library/conclusion/status.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
Expand All @@ -22,4 +24,12 @@ struct TSchemaColumn {
TString ToString() const;
};

struct TCountersDesc {
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
NMonitoring::TDynamicCounterPtr CountersSubgroup = MakeIntrusive<NMonitoring::TDynamicCounters>();
TString MkqlCountersName; // Used for TAlignedPagePoolCounters created from CountersRoot

[[nodiscard]] TCountersDesc CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const;
};

} // namespace NFq::NRowDispatcher
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/format_handler/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ SRCS(
)

PEERDIR(
library/cpp/monlib/dynamic_counters

ydb/library/conclusion
ydb/library/yql/dq/actors/protos

Expand Down
29 changes: 15 additions & 14 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
using TBase = NActors::TActor<TTopicFormatHandler>;

struct TCounters {
const NMonitoring::TDynamicCounterPtr CountersRoot;
const NMonitoring::TDynamicCounterPtr CountersSubgroup;
TCountersDesc Desc;

NMonitoring::TDynamicCounters::TCounterPtr ActiveFormatHandlers;
NMonitoring::TDynamicCounters::TCounterPtr ActiveClients;

TCounters(NMonitoring::TDynamicCounterPtr counters, const TSettings& settings)
: CountersRoot(counters)
, CountersSubgroup(counters->GetSubgroup("format", settings.ParsingFormat))
TCounters(const TCountersDesc& counters, const TSettings& settings)
: Desc(counters)
{
Desc.CountersSubgroup = Desc.CountersSubgroup->GetSubgroup("format", settings.ParsingFormat);

Register();
}

private:
void Register() {
ActiveFormatHandlers = CountersRoot->GetCounter("ActiveFormatHandlers", false);
ActiveFormatHandlers = Desc.CountersRoot->GetCounter("ActiveFormatHandlers", false);

ActiveClients = CountersSubgroup->GetCounter("ActiveClients", false);
ActiveClients = Desc.CountersSubgroup->GetCounter("ActiveClients", false);
}
};

Expand Down Expand Up @@ -282,9 +282,9 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
};

public:
TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, NMonitoring::TDynamicCounterPtr counters)
TTopicFormatHandler(const TFormatHandlerConfig& config, const TSettings& settings, const TCountersDesc& counters)
: TBase(&TTopicFormatHandler::StateFunc)
, TTypeParser(__LOCATION__)
, TTypeParser(__LOCATION__, counters.CopyWithNewMkqlCountersName("row_dispatcher"))
, Config(config)
, Settings(settings)
, LogPrefix(TStringBuilder() << "TTopicFormatHandler [" << Settings.ParsingFormat << "]: ")
Expand Down Expand Up @@ -487,18 +487,19 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

TValueStatus<ITopicParser::TPtr> CreateParserForFormat() const {
const auto& counters = Counters.Desc.CopyWithNewMkqlCountersName("row_dispatcher_parser");
if (Settings.ParsingFormat == "raw") {
return CreateRawParser(ParserHandler);
return CreateRawParser(ParserHandler, counters);
}
if (Settings.ParsingFormat == "json_each_row") {
return CreateJsonParser(ParserHandler, Config.JsonParserConfig);
return CreateJsonParser(ParserHandler, Config.JsonParserConfig, counters);
}
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Unsupported parsing format: " << Settings.ParsingFormat);
}

void CreateFilters() {
if (!Filters) {
Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.CountersSubgroup);
Filters = CreateTopicFilters(SelfId(), Config.FiltersConfig, Counters.Desc.CountersSubgroup);
}
}

Expand Down Expand Up @@ -567,7 +568,7 @@ void ITopicFormatHandler::TDestroy::Destroy(ITopicFormatHandler* handler) {
}
}

ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters) {
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters) {
const auto handler = new TTopicFormatHandler(config, settings, counters);
owner.RegisterWithSameMailbox(handler);
return ITopicFormatHandler::TPtr(handler);
Expand All @@ -585,7 +586,7 @@ TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConf
namespace NTests {

ITopicFormatHandler::TPtr CreateTestFormatHandler(const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings) {
const auto handler = new TTopicFormatHandler(config, settings, MakeIntrusive<NMonitoring::TDynamicCounters>());
const auto handler = new TTopicFormatHandler(config, settings, {});
NActors::TActivationContext::ActorSystem()->Register(handler);
return ITopicFormatHandler::TPtr(handler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ struct TFormatHandlerConfig {
TTopicFiltersConfig FiltersConfig;
};

ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, NMonitoring::TDynamicCounterPtr counters);
ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext& owner, const TFormatHandlerConfig& config, const ITopicFormatHandler::TSettings& settings, const TCountersDesc& counters);
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, NActors::TActorId compileServiceId);

namespace NTests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ class TJsonParser : public TTopicParserBase {
using TPtr = TIntrusivePtr<TJsonParser>;

public:
TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config)
: TBase(std::move(consumer), __LOCATION__)
TJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters)
: TBase(std::move(consumer), __LOCATION__, counters)
, Config(config)
, NumberColumns(Consumer->GetColumns().size())
, MaxNumberRows((config.BufferCellCount - 1) / NumberColumns + 1)
Expand Down Expand Up @@ -483,8 +483,8 @@ class TJsonParser : public TTopicParserBase {

} // anonymous namespace

TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config) {
TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config);
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters) {
TJsonParser::TPtr parser = MakeIntrusive<TJsonParser>(consumer, config, counters);
if (auto status = parser->InitColumnsParsers(); status.IsFail()) {
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ struct TJsonParserConfig {
ui64 BufferCellCount = 1000000; // (number rows) * (number columns) limit
};

TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config);
TValueStatus<ITopicParser::TPtr> CreateJsonParser(IParsedDataConsumer::TPtr consumer, const TJsonParserConfig& config, const TCountersDesc& counters);
TJsonParserConfig CreateJsonParserConfig(const NConfig::TJsonParserConfig& parserConfig);

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace NFq::NRowDispatcher {

//// TTypeParser

TTypeParser::TTypeParser(const TSourceLocation& location)
: Alloc(location, NKikimr::TAlignedPagePoolCounters(), true, false)
TTypeParser::TTypeParser(const TSourceLocation& location, const TCountersDesc& counters)
: Alloc(location, NKikimr::TAlignedPagePoolCounters(counters.CountersRoot, counters.MkqlCountersName), true, false)
, FunctionRegistry(NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, {}))
, TypeEnv(std::make_unique<NKikimr::NMiniKQL::TTypeEnvironment>(Alloc))
, ProgramBuilder(std::make_unique<NKikimr::NMiniKQL::TProgramBuilder>(*TypeEnv, *FunctionRegistry))
Expand Down Expand Up @@ -57,8 +57,8 @@ void TTopicParserBase::TStats::Clear() {

//// TTopicParserBase

TTopicParserBase::TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location)
: TTypeParser(location)
TTopicParserBase::TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location, const TCountersDesc& counters)
: TTypeParser(location, counters)
, Consumer(std::move(consumer))
{}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NFq::NRowDispatcher {

class TTypeParser {
public:
explicit TTypeParser(const TSourceLocation& location);
explicit TTypeParser(const TSourceLocation& location, const TCountersDesc& counters);
virtual ~TTypeParser();

TValueStatus<NKikimr::NMiniKQL::TType*> ParseTypeYson(const TString& typeYson) const;
Expand All @@ -36,7 +36,7 @@ class TTopicParserBase : public ITopicParser, public TTypeParser {
using TPtr = TIntrusivePtr<TTopicParserBase>;

public:
TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location);
TTopicParserBase(IParsedDataConsumer::TPtr consumer, const TSourceLocation& location, const TCountersDesc& counters);
virtual ~TTopicParserBase() = default;

public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TRawParser : public TTopicParserBase {
using TPtr = TIntrusivePtr<TRawParser>;

public:
TRawParser(IParsedDataConsumer::TPtr consumer, const TSchemaColumn& schema)
: TBase(std::move(consumer), __LOCATION__)
TRawParser(IParsedDataConsumer::TPtr consumer, const TSchemaColumn& schema, const TCountersDesc& counters)
: TBase(std::move(consumer), __LOCATION__, counters)
, Schema(schema)
, LogPrefix("TRawParser: ")
{}
Expand Down Expand Up @@ -108,13 +108,13 @@ class TRawParser : public TTopicParserBase {

} // anonymous namespace

TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer) {
TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer, const TCountersDesc& counters) {
const auto& columns = consumer->GetColumns();
if (columns.size() != 1) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Expected only one column for raw format, but got " << columns.size());
}

TRawParser::TPtr parser = MakeIntrusive<TRawParser>(consumer, columns[0]);
TRawParser::TPtr parser = MakeIntrusive<TRawParser>(consumer, columns[0], counters);
if (auto status = parser->InitColumnParser(); status.IsFail()) {
return status.AddParentIssue(TStringBuilder() << "Failed to create raw parser for column " << columns[0].ToString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

namespace NFq::NRowDispatcher {

TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer);
TValueStatus<ITopicParser::TPtr> CreateRawParser(IParsedDataConsumer::TPtr consumer, const TCountersDesc& counters);

} // namespace NFq::NRowDispatcher
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TBaseFixture::TBatch& TBaseFixture::TBatch::AddRow(TRow row) {
//// TBaseFixture

TBaseFixture::TBaseFixture()
: TTypeParser(__LOCATION__)
: TTypeParser(__LOCATION__, {})
, MemoryInfo("TBaseFixture alloc")
, HolderFactory(std::make_unique<NKikimr::NMiniKQL::THolderFactory>(Alloc.Ref(), MemoryInfo))
, Runtime(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class TJsonParserFixture : public TBaseParserFixture {

protected:
TValueStatus<ITopicParser::TPtr> CreateParser() override {
return CreateJsonParser(ParserHandler, Config);
return CreateJsonParser(ParserHandler, Config, {});
}

public:
Expand All @@ -172,7 +172,7 @@ class TJsonParserFixture : public TBaseParserFixture {
class TRawParserFixture : public TBaseParserFixture {
protected:
TValueStatus<ITopicParser::TPtr> CreateParser() override {
return CreateRawParser(ParserHandler);
return CreateRawParser(ParserHandler, {});
}
};

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
TString Tenant;
NFq::NRowDispatcher::IActorFactory::TPtr ActorFactory;
const ::NMonitoring::TDynamicCounterPtr Counters;
const ::NMonitoring::TDynamicCounterPtr CountersRoot;
TRowDispatcherMetrics Metrics;
NYql::IPqGateway::TPtr PqGateway;
NActors::TMon* Monitoring;
Expand Down Expand Up @@ -358,6 +359,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);

Expand Down Expand Up @@ -435,6 +437,7 @@ TRowDispatcher::TRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
: Config(config)
Expand All @@ -445,6 +448,7 @@ TRowDispatcher::TRowDispatcher(
, Tenant(tenant)
, ActorFactory(actorFactory)
, Counters(counters)
, CountersRoot(countersRoot)
, Metrics(counters)
, PqGateway(pqGateway)
, Monitoring(monitoring)
Expand Down Expand Up @@ -788,6 +792,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
ev->Get()->Record.GetToken(),
source.GetAddBearerToToken()),
Counters,
CountersRoot,
PqGateway,
MaxSessionBufferSizeBytes
);
Expand Down Expand Up @@ -1055,6 +1060,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
{
Expand All @@ -1066,6 +1072,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
tenant,
actorFactory,
counters,
countersRoot,
pqGateway,
monitoring));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcher(
const TString& tenant,
const NFq::NRowDispatcher::IActorFactory::TPtr& actorFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
const ::NMonitoring::TDynamicCounterPtr& countersRoot,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring)
NActors::TMon* monitoring,
::NMonitoring::TDynamicCounterPtr countersRoot)
{
return NewRowDispatcher(
config,
Expand All @@ -27,6 +28,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
tenant,
NFq::NRowDispatcher::CreateActorFactory(),
counters,
countersRoot,
pqGateway,
monitoring);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ std::unique_ptr<NActors::IActor> NewRowDispatcherService(
const TString& tenant,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NYql::IPqGateway::TPtr& pqGateway,
NActors::TMon* monitoring = nullptr);
NActors::TMon* monitoring = nullptr,
::NMonitoring::TDynamicCounterPtr countersRoot = MakeIntrusive<::NMonitoring::TDynamicCounters>());

} // namespace NFq
Loading

0 comments on commit 62228f6

Please sign in to comment.