From eb1276a4f4530cebeb699a0b027dcc52c9b8abf5 Mon Sep 17 00:00:00 2001 From: pilik Date: Fri, 23 Aug 2024 09:55:43 +0300 Subject: [PATCH] [KQP] ANALYZE retries has been added. (#8115) --- ydb/core/kqp/gateway/actors/analyze_actor.cpp | 112 ++++++++---------- ydb/core/kqp/gateway/actors/analyze_actor.h | 27 +++-- 2 files changed, 70 insertions(+), 69 deletions(-) diff --git a/ydb/core/kqp/gateway/actors/analyze_actor.cpp b/ydb/core/kqp/gateway/actors/analyze_actor.cpp index 2ababa6954bc..e9384a78e8ba 100644 --- a/ydb/core/kqp/gateway/actors/analyze_actor.cpp +++ b/ydb/core/kqp/gateway/actors/analyze_actor.cpp @@ -1,7 +1,6 @@ #include "analyze_actor.h" #include -#include #include #include #include @@ -42,19 +41,6 @@ void TAnalyzeActor::Bootstrap() { Become(&TAnalyzeActor::StateWork); } -void TAnalyzeActor::SendAnalyzeStatus() { - Y_ABORT_UNLESS(StatisticsAggregatorId.has_value()); - - auto getStatus = std::make_unique(); - auto& record = getStatus->Record; - record.SetOperationId(OperationId); - - Send( - MakePipePerNodeCacheID(false), - new TEvPipeCache::TEvForward(getStatus.release(), StatisticsAggregatorId.value(), true) - ); -} - void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ctx); @@ -67,50 +53,10 @@ void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, c << " , but expected " << OperationId); } - - // TODO Don't send EvAnalyzeStatus, EvAnalyzeResponse is already here - SendAnalyzeStatus(); -} - -void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ev); - Y_UNUSED(ctx); - - SendAnalyzeStatus(); -} - -void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; - switch (record.GetStatus()) { - case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_UNSPECIFIED: { - Promise.SetValue( - NYql::NCommon::ResultFromError( - YqlIssue( - {}, NYql::TIssuesIds::UNEXPECTED, - TStringBuilder() << "Statistics Aggregator unspecified error" - ) - ) - ); - this->Die(ctx); - return; - } - case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION: { - NYql::IKikimrGateway::TGenericResult result; - result.SetSuccess(); - Promise.SetValue(std::move(result)); - - this->Die(ctx); - return; - } - case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED: { - Schedule(TDuration::Seconds(10), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck()); - return; - } - case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS: { - Schedule(TDuration::Seconds(5), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck()); - return; - } - } + NYql::IKikimrGateway::TGenericResult result; + result.SetSuccess(); + Promise.SetValue(std::move(result)); + this->Die(ctx); } void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { @@ -188,13 +134,56 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& } } +TDuration TAnalyzeActor::CalcBackoffTime() { + ui32 backoffSlots = 1 << RetryCount; + TDuration maxDuration = RetryInterval * backoffSlots; + + double uncertaintyRatio = std::max(std::min(UncertainRatio, 1.0), 0.0); + double uncertaintyMultiplier = RandomNumber() * uncertaintyRatio - uncertaintyRatio + 1.0; + + double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier); + durationMs = std::max(std::min(durationMs, MaxBackoffDurationMs), 0.0); + return TDuration::MilliSeconds(durationMs); +} + +void TAnalyzeActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ev, ctx); + + if (RetryCount >= MaxRetryCount) { + Promise.SetValue( + NYql::NCommon::ResultFromError( + YqlIssue( + {}, NYql::TIssuesIds::UNEXPECTED, + TStringBuilder() << "Can't establish connection with the Statistics Aggregator!" + ) + ) + ); + this->Die(ctx); + return; + } + + ++RetryCount; + Schedule(CalcBackoffTime(), new TEvAnalyzePrivate::TEvAnalyzeRetry()); +} + +void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ev, ctx); + + auto analyzeRequest = std::make_unique(); + analyzeRequest->Record = Request.Record; + Send( + MakePipePerNodeCacheID(false), + new TEvPipeCache::TEvForward(analyzeRequest.release(), StatisticsAggregatorId.value(), true), + IEventHandle::FlagTrackDelivery + ); +} + void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TActorContext& ctx) { Y_ABORT_UNLESS(entry.DomainInfo->Params.HasStatisticsAggregator()); StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator(); - auto analyzeRequest = std::make_unique(); - auto& record = analyzeRequest->Record; + auto& record = Request.Record; record.SetOperationId(OperationId); auto table = record.AddTables(); @@ -223,7 +212,8 @@ void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeC *table->MutableColumnTags()->Add() = tagByColumnName[columnName]; } - // TODO This request should be retried if StatisticsAggregator fails + auto analyzeRequest = std::make_unique(); + analyzeRequest->Record = Request.Record; Send( MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(analyzeRequest.release(), entry.DomainInfo->Params.GetStatisticsAggregator(), true), diff --git a/ydb/core/kqp/gateway/actors/analyze_actor.h b/ydb/core/kqp/gateway/actors/analyze_actor.h index 8ea018c63d94..f59fba90c2b1 100644 --- a/ydb/core/kqp/gateway/actors/analyze_actor.h +++ b/ydb/core/kqp/gateway/actors/analyze_actor.h @@ -3,6 +3,7 @@ #include #include +#include #include @@ -11,11 +12,11 @@ namespace NKikimr::NKqp { struct TEvAnalyzePrivate { enum EEv { - EvAnalyzeStatusCheck = EventSpaceBegin(TEvents::ES_PRIVATE), + EvAnalyzeRetry = EventSpaceBegin(TEvents::ES_PRIVATE), EvEnd }; - struct TEvAnalyzeStatusCheck : public TEventLocal {}; + struct TEvAnalyzeRetry : public TEventLocal {}; }; class TAnalyzeActor : public NActors::TActorBootstrapped { @@ -28,8 +29,8 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { switch(ev->GetTypeRewrite()) { HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); HFunc(NStat::TEvStatistics::TEvAnalyzeResponse, Handle); - HFunc(NStat::TEvStatistics::TEvAnalyzeStatusResponse, Handle); - HFunc(TEvAnalyzePrivate::TEvAnalyzeStatusCheck, Handle); + HFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + HFunc(TEvAnalyzePrivate::TEvAnalyzeRetry, Handle); default: HandleUnexpectedEvent(ev->GetTypeRewrite()); } @@ -38,17 +39,18 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { private: void Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx); - void Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx); - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); - void Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx); + + void Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx); void HandleUnexpectedEvent(ui32 typeRewrite); +private: void SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry&, const TActorContext&); - void SendAnalyzeStatus(); + TDuration CalcBackoffTime(); private: TString TablePath; @@ -58,6 +60,15 @@ class TAnalyzeActor : public NActors::TActorBootstrapped { std::optional StatisticsAggregatorId; TPathId PathId; TString OperationId; + + // for retries + NStat::TEvStatistics::TEvAnalyze Request; + TDuration RetryInterval = TDuration::MilliSeconds(5); + size_t RetryCount = 0; + + constexpr static size_t MaxRetryCount = 10; + constexpr static double UncertainRatio = 0.5; + constexpr static double MaxBackoffDurationMs = TDuration::Seconds(15).MilliSeconds(); }; } // end of NKikimr::NKqp