Skip to content

Commit

Permalink
[KQP] ANALYZE retries has been added. (ydb-platform#8115)
Browse files Browse the repository at this point in the history
  • Loading branch information
pashandor789 authored Aug 23, 2024
1 parent ef8014a commit eb1276a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 69 deletions.
112 changes: 51 additions & 61 deletions ydb/core/kqp/gateway/actors/analyze_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "analyze_actor.h"

#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/util/ulid.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>
Expand Down Expand Up @@ -42,19 +41,6 @@ void TAnalyzeActor::Bootstrap() {
Become(&TAnalyzeActor::StateWork);
}

void TAnalyzeActor::SendAnalyzeStatus() {
Y_ABORT_UNLESS(StatisticsAggregatorId.has_value());

auto getStatus = std::make_unique<NStat::TEvStatistics::TEvAnalyzeStatus>();
auto& record = getStatus->Record;
record.SetOperationId(OperationId);

Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(getStatus.release(), StatisticsAggregatorId.value(), true)
);
}

void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);

Expand All @@ -67,50 +53,10 @@ void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, c
<< " , but expected " << OperationId);
}


// TODO Don't send EvAnalyzeStatus, EvAnalyzeResponse is already here
SendAnalyzeStatus();
}

void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev);
Y_UNUSED(ctx);

SendAnalyzeStatus();
}

void TAnalyzeActor::Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
switch (record.GetStatus()) {
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_UNSPECIFIED: {
Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "Statistics Aggregator unspecified error"
)
)
);
this->Die(ctx);
return;
}
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION: {
NYql::IKikimrGateway::TGenericResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));

this->Die(ctx);
return;
}
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED: {
Schedule(TDuration::Seconds(10), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck());
return;
}
case NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS: {
Schedule(TDuration::Seconds(5), new TEvAnalyzePrivate::TEvAnalyzeStatusCheck());
return;
}
}
NYql::IKikimrGateway::TGenericResult result;
result.SetSuccess();
Promise.SetValue(std::move(result));
this->Die(ctx);
}

void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
Expand Down Expand Up @@ -188,13 +134,56 @@ void TAnalyzeActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr&
}
}

TDuration TAnalyzeActor::CalcBackoffTime() {
ui32 backoffSlots = 1 << RetryCount;
TDuration maxDuration = RetryInterval * backoffSlots;

double uncertaintyRatio = std::max(std::min(UncertainRatio, 1.0), 0.0);
double uncertaintyMultiplier = RandomNumber<double>() * uncertaintyRatio - uncertaintyRatio + 1.0;

double durationMs = round(maxDuration.MilliSeconds() * uncertaintyMultiplier);
durationMs = std::max(std::min(durationMs, MaxBackoffDurationMs), 0.0);
return TDuration::MilliSeconds(durationMs);
}

void TAnalyzeActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev, ctx);

if (RetryCount >= MaxRetryCount) {
Promise.SetValue(
NYql::NCommon::ResultFromError<NYql::IKikimrGateway::TGenericResult>(
YqlIssue(
{}, NYql::TIssuesIds::UNEXPECTED,
TStringBuilder() << "Can't establish connection with the Statistics Aggregator!"
)
)
);
this->Die(ctx);
return;
}

++RetryCount;
Schedule(CalcBackoffTime(), new TEvAnalyzePrivate::TEvAnalyzeRetry());
}

void TAnalyzeActor::Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ev, ctx);

auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
analyzeRequest->Record = Request.Record;
Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(analyzeRequest.release(), StatisticsAggregatorId.value(), true),
IEventHandle::FlagTrackDelivery
);
}

void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TActorContext& ctx) {
Y_ABORT_UNLESS(entry.DomainInfo->Params.HasStatisticsAggregator());

StatisticsAggregatorId = entry.DomainInfo->Params.GetStatisticsAggregator();

auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
auto& record = analyzeRequest->Record;
auto& record = Request.Record;
record.SetOperationId(OperationId);
auto table = record.AddTables();

Expand Down Expand Up @@ -223,7 +212,8 @@ void TAnalyzeActor::SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeC
*table->MutableColumnTags()->Add() = tagByColumnName[columnName];
}

// TODO This request should be retried if StatisticsAggregator fails
auto analyzeRequest = std::make_unique<NStat::TEvStatistics::TEvAnalyze>();
analyzeRequest->Record = Request.Record;
Send(
MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(analyzeRequest.release(), entry.DomainInfo->Params.GetStatisticsAggregator(), true),
Expand Down
27 changes: 19 additions & 8 deletions ydb/core/kqp/gateway/actors/analyze_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>


Expand All @@ -11,11 +12,11 @@ namespace NKikimr::NKqp {

struct TEvAnalyzePrivate {
enum EEv {
EvAnalyzeStatusCheck = EventSpaceBegin(TEvents::ES_PRIVATE),
EvAnalyzeRetry = EventSpaceBegin(TEvents::ES_PRIVATE),
EvEnd
};

struct TEvAnalyzeStatusCheck : public TEventLocal<TEvAnalyzeStatusCheck, EvAnalyzeStatusCheck> {};
struct TEvAnalyzeRetry : public TEventLocal<TEvAnalyzeRetry, EvAnalyzeRetry> {};
};

class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
Expand All @@ -28,8 +29,8 @@ class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
switch(ev->GetTypeRewrite()) {
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
HFunc(NStat::TEvStatistics::TEvAnalyzeResponse, Handle);
HFunc(NStat::TEvStatistics::TEvAnalyzeStatusResponse, Handle);
HFunc(TEvAnalyzePrivate::TEvAnalyzeStatusCheck, Handle);
HFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
HFunc(TEvAnalyzePrivate::TEvAnalyzeRetry, Handle);
default:
HandleUnexpectedEvent(ev->GetTypeRewrite());
}
Expand All @@ -38,17 +39,18 @@ class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
private:
void Handle(NStat::TEvStatistics::TEvAnalyzeResponse::TPtr& ev, const TActorContext& ctx);

void Handle(NStat::TEvStatistics::TEvAnalyzeStatusResponse::TPtr& ev, const TActorContext& ctx);

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);

void Handle(TEvAnalyzePrivate::TEvAnalyzeStatusCheck::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActorContext& ctx);

void Handle(TEvAnalyzePrivate::TEvAnalyzeRetry::TPtr& ev, const TActorContext& ctx);

void HandleUnexpectedEvent(ui32 typeRewrite);

private:
void SendStatisticsAggregatorAnalyze(const NSchemeCache::TSchemeCacheNavigate::TEntry&, const TActorContext&);

void SendAnalyzeStatus();
TDuration CalcBackoffTime();

private:
TString TablePath;
Expand All @@ -58,6 +60,15 @@ class TAnalyzeActor : public NActors::TActorBootstrapped<TAnalyzeActor> {
std::optional<ui64> StatisticsAggregatorId;
TPathId PathId;
TString OperationId;

// for retries
NStat::TEvStatistics::TEvAnalyze Request;
TDuration RetryInterval = TDuration::MilliSeconds(5);
size_t RetryCount = 0;

constexpr static size_t MaxRetryCount = 10;
constexpr static double UncertainRatio = 0.5;
constexpr static double MaxBackoffDurationMs = TDuration::Seconds(15).MilliSeconds();
};

} // end of NKikimr::NKqp

0 comments on commit eb1276a

Please sign in to comment.