Skip to content

Commit

Permalink
YQ-3893 Remove query_id metrics on terminate / to stable (ydb-platfor…
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Dec 11, 2024
1 parent 472861e commit fc43113
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 10 deletions.
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ void TRowDispatcher::UpdateMetrics() {
}
for (const auto& key : toDelete) {
SetQueryMetrics(key, 0, 0, 0);
Metrics.Counters->RemoveSubgroup("query_id", key.QueryId);
AggrStats.LastQueryStats.erase(key);
}
PrintStateToLog();
Expand Down
19 changes: 12 additions & 7 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,27 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
struct TClientsInfo : public IClientDataConsumer {
using TPtr = TIntrusivePtr<TClientsInfo>;

TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, NMonitoring::TDynamicCounterPtr& counters)
TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup)
: Self(self)
, LogPrefix(logPrefix)
, HandlerSettings(handlerSettings)
, Settings(ev->Get()->Record)
, ReadActorId(ev->Sender)
, FilteredDataRate(counters->GetCounter("FilteredDataRate", true))
, RestartSessionByOffsetsByQuery(counters->GetCounter("RestartSessionByOffsetsByQuery", true))
, Counters(counters)
{
if (Settings.HasOffset()) {
NextMessageOffset = Settings.GetOffset();
InitialOffset = Settings.GetOffset();
}
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
auto topicGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(readGroup));
FilteredDataRate = topicGroup->GetCounter("FilteredDataRate", true);
RestartSessionByOffsetsByQuery = counters->GetCounter("RestartSessionByOffsetsByQuery", true);
}

~TClientsInfo() {
Counters->RemoveSubgroup("query_id", Settings.GetQueryId());
}

TActorId GetClientId() const override {
Expand Down Expand Up @@ -188,6 +195,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
// Metrics
ui64 InitialOffset = 0;
TStats Stat; // Send (filtered) to read_actor
const ::NMonitoring::TDynamicCounterPtr Counters;
NMonitoring::TDynamicCounters::TCounterPtr FilteredDataRate; // filtered
NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsetsByQuery;
};
Expand Down Expand Up @@ -707,10 +715,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
const TString& format = source.GetFormat();
ITopicFormatHandler::TSettings handlerSettings = {.ParsingFormat = format ? format : "raw"};

auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
auto readGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(ReadGroup));
auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, readGroup)}).first->second;

auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive<TClientsInfo>(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup)}).first->second;
auto formatIt = FormatHandlers.find(handlerSettings);
if (formatIt == FormatHandlers.end()) {
formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(ActorContext(), FormatHandlerConfig, handlerSettings, Metrics.PartitionGroup)}).first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct TRowDispatcherReadActorMetrics {
}

~TRowDispatcherReadActorMetrics() {
SubGroup->RemoveSubgroup("id", TxId);
SubGroup->RemoveSubgroup("tx_id", TxId);
}

TString TxId;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
}

~TMetrics() {
SubGroup->RemoveSubgroup("id", TxId);
SubGroup->RemoveSubgroup("tx_id", TxId);
}

TString TxId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
}

~TMetrics() {
SubGroup->RemoveSubgroup("id", TxId);
SubGroup->RemoveSubgroup("tx_id", TxId);
}

TString TxId;
Expand Down

0 comments on commit fc43113

Please sign in to comment.