From bcaf4e05bb85473b480f24a17a75502f6748088d Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 7 Dec 2024 15:10:42 +0000 Subject: [PATCH] Fixed IsClientStarted --- .../format_handler/format_handler.cpp | 28 +++++++++++-------- .../format_handler/format_handler.h | 1 - 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp index 86a276d164bb..0ba3d62f2c90 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp @@ -49,8 +49,8 @@ class TTopicFormatHandler : public NActors::TActor, public public: TParserHandler(TTopicFormatHandler& self, TVector parerSchema) : Self(self) - , LogPrefix(TStringBuilder() << Self.LogPrefix << "TParserHandler: ") , ParerSchema(parerSchema) + , LogPrefix(TStringBuilder() << Self.LogPrefix << "TParserHandler: ") {} public: @@ -67,7 +67,7 @@ class TTopicFormatHandler : public NActors::TActor, public LOG_ROW_DISPATCHER_TRACE("Got parsed data, number rows: " << numberRows); Self.ParsedData.assign(ParerSchema.size(), nullptr); - for (ui64 i = 0; i < ParerSchema.size(); ++i) { + for (size_t i = 0; i < ParerSchema.size(); ++i) { auto columnStatus = Self.Parser->GetParsedColumn(i); if (Y_LIKELY(columnStatus.IsSuccess())) { Self.ParsedData[i] = columnStatus.GetValue(); @@ -77,14 +77,13 @@ class TTopicFormatHandler : public NActors::TActor, public } Self.Offsets = &Self.Parser->GetOffsets(); - Self.FilterData(numberRows); } private: void OnColumnError(ui64 columnIndex, TStatus status) { const auto& column = ParerSchema[columnIndex]; - LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", status: " << status.ToString()); + LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", " << status.ToString()); const auto columnIt = Self.ColumnsDesc.find(column.Name); if (columnIt == Self.ColumnsDesc.end()) { @@ -101,8 +100,8 @@ class TTopicFormatHandler : public NActors::TActor, public private: TTopicFormatHandler& Self; - const TString LogPrefix; const TVector ParerSchema; + const TString LogPrefix; }; class TClientHandler : public IFilteredDataConsumer { @@ -113,8 +112,8 @@ class TTopicFormatHandler : public NActors::TActor, public TClientHandler(TTopicFormatHandler& self, IClientDataConsumer::TPtr client) : Self(self) , Client(client) - , LogPrefix(TStringBuilder() << Self.LogPrefix << "TClientHandler " << Client->GetClientId() << ": ") , Columns(Client->GetColumns()) + , LogPrefix(TStringBuilder() << Self.LogPrefix << "TClientHandler " << Client->GetClientId() << ": ") , FilteredRow(Columns.size()) { ColumnsIds.reserve(Columns.size()); @@ -124,6 +123,10 @@ class TTopicFormatHandler : public NActors::TActor, public return Client; } + bool IsClientStarted() const { + return ClientStarted; + } + TStatus SetupColumns() { if (Columns.empty()) { return TStatus(TStatus::EId::INTERNAL_ERROR, "Client should have at least one column in schema"); @@ -203,7 +206,7 @@ class TTopicFormatHandler : public NActors::TActor, public } void OnFilterStarted() override { - FilterCompiled = true; + ClientStarted = true; Client->StartClientSession(); } @@ -263,10 +266,11 @@ class TTopicFormatHandler : public NActors::TActor, public private: TTopicFormatHandler& Self; const IClientDataConsumer::TPtr Client; + const TVector Columns; const TString LogPrefix; - TVector Columns; + TVector ColumnsIds; - bool FilterCompiled = false; + bool ClientStarted = false; // Filtered data ui64 DataPackerSize = 0; @@ -499,8 +503,10 @@ class TTopicFormatHandler : public NActors::TActor, public } for (const auto& [_, client] : Clinets) { - LOG_ROW_DISPATCHER_TRACE("Commit client " << client->GetClient()->GetClientId() << " offset " << lastOffset); - client->GetClient()->UpdateClinetOffset(lastOffset); + if (client->IsClientStarted()) { + LOG_ROW_DISPATCHER_TRACE("Commit client " << client->GetClient()->GetClientId() << " offset " << lastOffset); + client->GetClient()->UpdateClinetOffset(lastOffset); + } } } diff --git a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h index 365e9e18884b..98823ba0466f 100644 --- a/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h +++ b/ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h @@ -21,7 +21,6 @@ class IClientDataConsumer : public TThrRefBase { virtual NActors::TActorId GetClientId() const = 0; virtual TMaybe GetNextMessageOffset() const = 0; - // After error client moves into invalid state and should be removed virtual void OnClientError(TStatus status) = 0; virtual void StartClientSession() = 0;