From e8e7d789db7f4e3574b3dec2af576b7577186ae8 Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Thu, 7 Nov 2024 10:58:37 +0100 Subject: [PATCH] Reconnect period (#11343) --- .../fq/libs/row_dispatcher/topic_session.cpp | 29 +++++++++++++++++++ .../common/proto/gateways_config.proto | 3 +- .../pq/async_io/dq_pq_read_actor.cpp | 26 +++++++++++++++++ .../yql/providers/pq/common/yql_names.h | 1 + .../yql/providers/pq/proto/dq_io.proto | 1 + .../pq/provider/yql_pq_dq_integration.cpp | 3 ++ .../providers/pq/provider/yql_pq_settings.cpp | 1 + .../providers/pq/provider/yql_pq_settings.h | 1 + 8 files changed, 64 insertions(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 2072f2d7ea86..a2b5e2dbb2a2 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -33,6 +33,7 @@ struct TTopicSessionMetrics { InFlyAsyncInputData = SubGroup->GetCounter("InFlyAsyncInputData"); RowsRead = SubGroup->GetCounter("RowsRead", true); InFlySubscribe = SubGroup->GetCounter("InFlySubscribe"); + ReconnectRate = SubGroup->GetCounter("ReconnectRate", true); } ~TTopicSessionMetrics() { @@ -43,6 +44,7 @@ struct TTopicSessionMetrics { ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData; ::NMonitoring::TDynamicCounters::TCounterPtr RowsRead; ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe; + ::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate; }; struct TEvPrivate { @@ -56,6 +58,7 @@ struct TEvPrivate { EvDataFiltered, EvSendStatistic, EvStartParsing, + EvReconnectSession, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); @@ -66,6 +69,7 @@ struct TEvPrivate { struct TEvSendStatistic : public NActors::TEventLocal {}; struct TEvStatus : public NActors::TEventLocal {}; struct TEvStartParsing : public NActors::TEventLocal {}; + struct TEvReconnectSession : public NActors::TEventLocal {}; struct TEvDataFiltered : public NActors::TEventLocal { explicit TEvDataFiltered(ui64 offset) @@ -136,6 +140,8 @@ class TTopicSession : public TActorBootstrapped { TParserInputType InputType; }; + bool InflightReconnect = false; + TDuration ReconnectPeriod; const TString TopicPath; const TString Endpoint; const TString Database; @@ -205,6 +211,7 @@ class TTopicSession : public TActorBootstrapped { void Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&); void Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&); + void Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&); void Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr&); void Handle(NFq::TEvPrivate::TEvStatus::TPtr&); void Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr&); @@ -375,6 +382,17 @@ void TTopicSession::CreateTopicSession() { ReadSession = GetTopicClient(sourceParams).CreateReadSession(GetReadSessionSettings(sourceParams)); SubscribeOnNextEvent(); } + + if (!InflightReconnect) { + // Use any sourceParams. + const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource(); + Y_UNUSED(TDuration::TryParse(sourceParams.GetReconnectPeriod(), ReconnectPeriod)); + if (ReconnectPeriod != TDuration::Zero()) { + Metrics.ReconnectRate->Inc(); + Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession()); + } + InflightReconnect = true; + } } void TTopicSession::Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&) { @@ -432,6 +450,17 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvStatus::TPtr&) { } } +void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) { + Metrics.ReconnectRate->Inc(); + TInstant minTime = GetMinStartingMessageTimestamp(); + LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, Path " << TopicPath + << ", StartingMessageTimestamp " << minTime + << ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer()); + StopReadSession(); + CreateTopicSession(); + Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession()); +} + void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvDataFiltered, last offset " << ev->Get()->Offset); for (auto& [actorId, info] : Clients) { diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index dbb433a4e2f1..26723da14aea 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -326,7 +326,8 @@ message TPqClusterConfig { optional bool AddBearerToToken = 11; // whether to use prefix "Bearer " in token optional string DatabaseId = 12; repeated TAttr Settings = 100; - optional bool SharedReading = 101; + optional bool SharedReading = 101; + optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m } message TPqGatewayConfig { diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 9d9d5a03da5a..839e4dfec351 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -74,6 +74,7 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvSourceDataReady = EvBegin, + EvReconnectSession, EvEnd }; @@ -83,6 +84,7 @@ struct TEvPrivate { // Events struct TEvSourceDataReady : public TEventLocal {}; + struct TEvReconnectSession : public TEventLocal {}; }; } // namespace @@ -98,6 +100,7 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData"); InFlySubscribe = task->GetCounter("InFlySubscribe"); AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true); + ReconnectRate = task->GetCounter("ReconnectRate", true); } ~TMetrics() { @@ -110,6 +113,7 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData; ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe; ::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate; + ::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate; }; public: @@ -139,6 +143,7 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: , CredentialsProviderFactory(std::move(credentialsProviderFactory)) , PqGateway(pqGateway) { + Y_UNUSED(TDuration::TryParse(SourceParams.GetReconnectPeriod(), ReconnectPeriod)); MetadataFields.reserve(SourceParams.MetadataFieldsSize()); TPqMetaExtractor fieldsExtractor; for (const auto& fieldName : SourceParams.GetMetadataFields()) { @@ -209,6 +214,7 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: private: STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvSourceDataReady, Handle); + hFunc(TEvPrivate::TEvReconnectSession, Handle); ) void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) { @@ -222,6 +228,18 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } + void Handle(TEvPrivate::TEvReconnectSession::TPtr&) { + SRC_LOG_D("SessionId: " << GetSessionId() << ", Reconnect epoch: " << Metrics.ReconnectRate->Val()); + Metrics.ReconnectRate->Inc(); + if (ReadSession) { + ReadSession->Close(TDuration::Zero()); + ReadSession.reset(); + ReadyBuffer = std::queue{}; // clear read buffer + } + + Schedule(ReconnectPeriod, new TEvPrivate::TEvReconnectSession()); + } + // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor std::queue empty; @@ -259,6 +277,12 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: const auto now = TInstant::Now(); MaybeScheduleNextIdleCheck(now); + if (!InflightReconnect && ReconnectPeriod != TDuration::Zero()) { + Metrics.ReconnectRate->Inc(); + Schedule(ReconnectPeriod, new TEvPrivate::TEvSourceDataReady()); + InflightReconnect = true; + } + i64 usedSpace = 0; if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) { return usedSpace; @@ -565,6 +589,8 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: }; private: + bool InflightReconnect = false; + TDuration ReconnectPeriod; TMetrics Metrics; const i64 BufferSize; const THolderFactory& HolderFactory; diff --git a/ydb/library/yql/providers/pq/common/yql_names.h b/ydb/library/yql/providers/pq/common/yql_names.h index c4f6eeb3cc42..b26850e3004b 100644 --- a/ydb/library/yql/providers/pq/common/yql_names.h +++ b/ydb/library/yql/providers/pq/common/yql_names.h @@ -15,5 +15,6 @@ constexpr TStringBuf WatermarksEnableSetting = "WatermarksEnable"; constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs"; constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs"; constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions"; +constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod"; } // namespace NYql diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto index 1f9a17b71782..565563647a82 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io.proto @@ -37,6 +37,7 @@ message TDqPqTopicSource { repeated string ColumnTypes = 13; string Predicate = 14; bool SharedReading = 15; + string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m } message TDqPqTopicSink { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 9c044025fca7..99a4bd187ee2 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -221,6 +221,8 @@ class TPqDqIntegration: public TDqIntegrationBase { srcDesc.SetEndpoint(TString(Value(setting))); } else if (name == SharedReading) { sharedReading = FromString(Value(setting)); + } else if (name == ReconnectPeriod) { + srcDesc.SetReconnectPeriod(TString(Value(setting))); } else if (name == Format) { format = TString(Value(setting)); } else if (name == UseSslSetting) { @@ -338,6 +340,7 @@ class TPqDqIntegration: public TDqIntegrationBase { Add(props, EndpointSetting, clusterConfiguration->Endpoint, pos, ctx); Add(props, SharedReading, ToString(clusterConfiguration->SharedReading), pos, ctx); + Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx); Add(props, Format, format, pos, ctx); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp index 5b97002b9ad1..ecb6b28ef938 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp @@ -43,6 +43,7 @@ void TPqConfiguration::Init( clusterSettings.UseSsl = cluster.GetUseSsl(); clusterSettings.AddBearerToToken = cluster.GetAddBearerToToken(); clusterSettings.SharedReading = cluster.GetSharedReading(); + clusterSettings.ReconnectPeriod = cluster.GetReconnectPeriod(); const TString authToken = typeCtx->Credentials->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken()); clusterSettings.AuthToken = authToken; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h index 672effd42fc8..c5ff130fa874 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_settings.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_settings.h @@ -30,6 +30,7 @@ struct TPqClusterConfigurationSettings { TString AuthToken; bool AddBearerToToken = false; bool SharedReading = false; + TString ReconnectPeriod; }; struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher {