Skip to content

Commit

Permalink
Reconnect period (ydb-platform#11343)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Nov 7, 2024
1 parent f6cd127 commit e8e7d78
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 1 deletion.
29 changes: 29 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct TTopicSessionMetrics {
InFlyAsyncInputData = SubGroup->GetCounter("InFlyAsyncInputData");
RowsRead = SubGroup->GetCounter("RowsRead", true);
InFlySubscribe = SubGroup->GetCounter("InFlySubscribe");
ReconnectRate = SubGroup->GetCounter("ReconnectRate", true);
}

~TTopicSessionMetrics() {
Expand All @@ -43,6 +44,7 @@ struct TTopicSessionMetrics {
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
::NMonitoring::TDynamicCounters::TCounterPtr RowsRead;
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;
};

struct TEvPrivate {
Expand All @@ -56,6 +58,7 @@ struct TEvPrivate {
EvDataFiltered,
EvSendStatistic,
EvStartParsing,
EvReconnectSession,
EvEnd
};
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
Expand All @@ -66,6 +69,7 @@ struct TEvPrivate {
struct TEvSendStatistic : public NActors::TEventLocal<TEvSendStatistic, EvSendStatistic> {};
struct TEvStatus : public NActors::TEventLocal<TEvStatus, EvStatus> {};
struct TEvStartParsing : public NActors::TEventLocal<TEvStartParsing, EvStartParsing> {};
struct TEvReconnectSession : public NActors::TEventLocal<TEvReconnectSession, EvReconnectSession> {};

struct TEvDataFiltered : public NActors::TEventLocal<TEvDataFiltered, EvDataFiltered> {
explicit TEvDataFiltered(ui64 offset)
Expand Down Expand Up @@ -136,6 +140,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
TParserInputType InputType;
};

bool InflightReconnect = false;
TDuration ReconnectPeriod;
const TString TopicPath;
const TString Endpoint;
const TString Database;
Expand Down Expand Up @@ -205,6 +211,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {

void Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&);
void Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&);
void Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&);
void Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr&);
void Handle(NFq::TEvPrivate::TEvStatus::TPtr&);
void Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr&);
Expand Down Expand Up @@ -375,6 +382,17 @@ void TTopicSession::CreateTopicSession() {
ReadSession = GetTopicClient(sourceParams).CreateReadSession(GetReadSessionSettings(sourceParams));
SubscribeOnNextEvent();
}

if (!InflightReconnect) {
// Use any sourceParams.
const NYql::NPq::NProto::TDqPqTopicSource& sourceParams = Clients.begin()->second.Settings.GetSource();
Y_UNUSED(TDuration::TryParse(sourceParams.GetReconnectPeriod(), ReconnectPeriod));
if (ReconnectPeriod != TDuration::Zero()) {
Metrics.ReconnectRate->Inc();
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
}
InflightReconnect = true;
}
}

void TTopicSession::Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&) {
Expand Down Expand Up @@ -432,6 +450,17 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvStatus::TPtr&) {
}
}

void TTopicSession::Handle(NFq::TEvPrivate::TEvReconnectSession::TPtr&) {
Metrics.ReconnectRate->Inc();
TInstant minTime = GetMinStartingMessageTimestamp();
LOG_ROW_DISPATCHER_DEBUG("Reconnect topic session, Path " << TopicPath
<< ", StartingMessageTimestamp " << minTime
<< ", BufferSize " << BufferSize << ", WithoutConsumer " << Config.GetWithoutConsumer());
StopReadSession();
CreateTopicSession();
Schedule(ReconnectPeriod, new NFq::TEvPrivate::TEvReconnectSession());
}

void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) {
LOG_ROW_DISPATCHER_TRACE("TEvDataFiltered, last offset " << ev->Get()->Offset);
for (auto& [actorId, info] : Clients) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/common/proto/gateways_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,8 @@ message TPqClusterConfig {
optional bool AddBearerToToken = 11; // whether to use prefix "Bearer " in token
optional string DatabaseId = 12;
repeated TAttr Settings = 100;
optional bool SharedReading = 101;
optional bool SharedReading = 101;
optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m
}

message TPqGatewayConfig {
Expand Down
26 changes: 26 additions & 0 deletions ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct TEvPrivate {
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),

EvSourceDataReady = EvBegin,
EvReconnectSession,

EvEnd
};
Expand All @@ -83,6 +84,7 @@ struct TEvPrivate {
// Events

struct TEvSourceDataReady : public TEventLocal<TEvSourceDataReady, EvSourceDataReady> {};
struct TEvReconnectSession : public TEventLocal<TEvReconnectSession, EvReconnectSession> {};
};

} // namespace
Expand All @@ -98,6 +100,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData");
InFlySubscribe = task->GetCounter("InFlySubscribe");
AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true);
ReconnectRate = task->GetCounter("ReconnectRate", true);
}

~TMetrics() {
Expand All @@ -110,6 +113,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;
};

public:
Expand Down Expand Up @@ -139,6 +143,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
, PqGateway(pqGateway)
{
Y_UNUSED(TDuration::TryParse(SourceParams.GetReconnectPeriod(), ReconnectPeriod));
MetadataFields.reserve(SourceParams.MetadataFieldsSize());
TPqMetaExtractor fieldsExtractor;
for (const auto& fieldName : SourceParams.GetMetadataFields()) {
Expand Down Expand Up @@ -209,6 +214,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
private:
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvSourceDataReady, Handle);
hFunc(TEvPrivate::TEvReconnectSession, Handle);
)

void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) {
Expand All @@ -222,6 +228,18 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}

void Handle(TEvPrivate::TEvReconnectSession::TPtr&) {
SRC_LOG_D("SessionId: " << GetSessionId() << ", Reconnect epoch: " << Metrics.ReconnectRate->Val());
Metrics.ReconnectRate->Inc();
if (ReadSession) {
ReadSession->Close(TDuration::Zero());
ReadSession.reset();
ReadyBuffer = std::queue<TReadyBatch>{}; // clear read buffer
}

Schedule(ReconnectPeriod, new TEvPrivate::TEvReconnectSession());
}

// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
std::queue<TReadyBatch> empty;
Expand Down Expand Up @@ -259,6 +277,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
const auto now = TInstant::Now();
MaybeScheduleNextIdleCheck(now);

if (!InflightReconnect && ReconnectPeriod != TDuration::Zero()) {
Metrics.ReconnectRate->Inc();
Schedule(ReconnectPeriod, new TEvPrivate::TEvSourceDataReady());
InflightReconnect = true;
}

i64 usedSpace = 0;
if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
return usedSpace;
Expand Down Expand Up @@ -565,6 +589,8 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
};

private:
bool InflightReconnect = false;
TDuration ReconnectPeriod;
TMetrics Metrics;
const i64 BufferSize;
const THolderFactory& HolderFactory;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/common/yql_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ constexpr TStringBuf WatermarksEnableSetting = "WatermarksEnable";
constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs";
constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs";
constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions";
constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod";

} // namespace NYql
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/proto/dq_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ message TDqPqTopicSource {
repeated string ColumnTypes = 13;
string Predicate = 14;
bool SharedReading = 15;
string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m
}

message TDqPqTopicSink {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ class TPqDqIntegration: public TDqIntegrationBase {
srcDesc.SetEndpoint(TString(Value(setting)));
} else if (name == SharedReading) {
sharedReading = FromString<bool>(Value(setting));
} else if (name == ReconnectPeriod) {
srcDesc.SetReconnectPeriod(TString(Value(setting)));
} else if (name == Format) {
format = TString(Value(setting));
} else if (name == UseSslSetting) {
Expand Down Expand Up @@ -338,6 +340,7 @@ class TPqDqIntegration: public TDqIntegrationBase {

Add(props, EndpointSetting, clusterConfiguration->Endpoint, pos, ctx);
Add(props, SharedReading, ToString(clusterConfiguration->SharedReading), pos, ctx);
Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx);
Add(props, Format, format, pos, ctx);


Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void TPqConfiguration::Init(
clusterSettings.UseSsl = cluster.GetUseSsl();
clusterSettings.AddBearerToToken = cluster.GetAddBearerToToken();
clusterSettings.SharedReading = cluster.GetSharedReading();
clusterSettings.ReconnectPeriod = cluster.GetReconnectPeriod();

const TString authToken = typeCtx->Credentials->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken());
clusterSettings.AuthToken = authToken;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/pq/provider/yql_pq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ struct TPqClusterConfigurationSettings {
TString AuthToken;
bool AddBearerToToken = false;
bool SharedReading = false;
TString ReconnectPeriod;
};

struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher {
Expand Down

0 comments on commit e8e7d78

Please sign in to comment.