Skip to content

Commit

Permalink
MonPage/ReconnectPeriod to q-stable-2024-07-08 (ydb-platform#11451)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Nov 11, 2024
1 parent 02234c6 commit 2c99cf0
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 12 deletions.
1 change: 1 addition & 0 deletions ydb/core/fq/libs/config/protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ message TCommonConfig {
bool DisableSslForGenericDataSources = 15;
bool ShowQueryTimeline = 16;
uint64 MaxQueryTimelineSize = 17; // default: 200KB
string PqReconnectPeriod = 18; // default: disabled
}
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ void Init(
appData->FunctionRegistry
);
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, NYql::CreatePqNativeGateway(std::move(pqServices)),
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());

s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());
Expand Down
31 changes: 27 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/public/purecalc/common/interface.h>

#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/mon/mon.h>

#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
Expand Down Expand Up @@ -223,11 +225,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
void Handle(const NMon::TEvHttpInfo::TPtr&);

void DeleteConsumer(const ConsumerSessionKey& key);
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
void UpdateMetrics();
void PrintInternalState();
TString GetInternalState();

STRICT_STFUNC(
StateFunc, {
Expand All @@ -252,6 +255,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
hFunc(NMon::TEvHttpInfo, Handle);
})
};

Expand Down Expand Up @@ -287,6 +291,13 @@ void TRowDispatcher::Bootstrap() {
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());

NActors::TMon* mon = NKikimr::AppData()->Mon;
if (mon) {
::NMonitoring::TIndexMonPage* actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
mon->RegisterActorPage(actorsMonPage, "row_dispatcher", "Row Dispatcher", false,
TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
}
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
Expand Down Expand Up @@ -372,7 +383,7 @@ void TRowDispatcher::UpdateMetrics() {
}
}

void TRowDispatcher::PrintInternalState() {
TString TRowDispatcher::GetInternalState() {
TStringStream str;
str << "Statistics:\n";
for (auto& [key, sessionsInfo] : TopicSessions) {
Expand All @@ -390,7 +401,7 @@ void TRowDispatcher::PrintInternalState() {
}
}
}
LOG_ROW_DISPATCHER_DEBUG(str.Str());
return str.Str();
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
Expand Down Expand Up @@ -632,10 +643,22 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
}

void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
PrintInternalState();
LOG_ROW_DISPATCHER_DEBUG(GetInternalState());
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
}

void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) {
TStringStream str;
HTML(str) {
PRE() {
str << "Current state:" << Endl;
str << GetInternalState() << Endl;
str << Endl;
}
}
Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str()));
}

void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
const auto& key = ev->Get()->Stat.SessionKey;
Expand Down
11 changes: 7 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
if (Settings.HasOffset()) {
NextMessageOffset = Settings.GetOffset();
}
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
}
NFq::NRowDispatcherProto::TEvStartSession Settings;
NActors::TActorId ReadActorId;
Expand All @@ -119,6 +120,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
TMaybe<ui64> NextMessageOffset;
ui64 LastSendedNextMessageOffset = 0;
TVector<ui64> FieldsIds;
TDuration ReconnectPeriod;
};

struct TTopicEventProcessor {
Expand Down Expand Up @@ -239,6 +241,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
hFunc(NFq::TEvPrivate::TEvStatus, Handle);
hFunc(NFq::TEvPrivate::TEvDataFiltered, Handle);
hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle);
hFunc(NFq::TEvPrivate::TEvReconnectSession, Handle);
hFunc(TEvRowDispatcher::TEvGetNextBatch, Handle);
hFunc(NFq::TEvRowDispatcher::TEvStartSession, Handle);
sFunc(NFq::TEvPrivate::TEvStartParsing, DoParsing);
Expand Down Expand Up @@ -387,15 +390,15 @@ void TTopicSession::CreateTopicSession() {
SubscribeOnNextEvent();
}

if (!InflightReconnect) {
if (!InflightReconnect && Clients) {
// Use any sourceParams.
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource();
Y_UNUSED(TDuration::TryParse(sourceParams.GetReconnectPeriod(), ReconnectPeriod));
ReconnectPeriod = Clients.begin()->second.ReconnectPeriod;
if (ReconnectPeriod != TDuration::Zero()) {
LOG_ROW_DISPATCHER_INFO("ReconnectPeriod " << ReconnectPeriod.ToString());
Metrics.ReconnectRate->Inc();
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
InflightReconnect = true;
}
InflightReconnect = true;
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ SRCS(
PEERDIR(
contrib/libs/fmt
contrib/libs/simdjson
ydb/core/base
ydb/core/fq/libs/actors/logging
ydb/core/fq/libs/config/protos
ydb/core/fq/libs/control_plane_storage
ydb/core/fq/libs/row_dispatcher/events
ydb/core/fq/libs/shared_resources
ydb/core/fq/libs/ydb
ydb/core/mon
ydb/library/actors/core
ydb/library/security
ydb/library/yql/dq/actors/common
Expand Down
8 changes: 6 additions & 2 deletions ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,14 +655,18 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
return {actor, actor};
}

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) {
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, const TString& reconnectPeriod) {
factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway](
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, reconnectPeriod](
NPq::NProto::TDqPqTopicSource&& settings,
IDqAsyncIoFactory::TSourceArguments&& args)
{
NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER));

if (reconnectPeriod) {
settings.SetReconnectPeriod(reconnectPeriod);
}

if (!settings.GetSharedReading()) {
return CreateDqPqReadActor(
std::move(settings),
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
i64 bufferSize = PQReadDefaultFreeSpace
);

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), const TString& reconnectPeriod = {});

} // namespace NYql::NDq

0 comments on commit 2c99cf0

Please sign in to comment.