Skip to content

Commit

Permalink
Fixed IsClientStarted
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 7, 2024
1 parent 092c743 commit bcaf4e0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
28 changes: 17 additions & 11 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
public:
TParserHandler(TTopicFormatHandler& self, TVector<TSchemaColumn> parerSchema)
: Self(self)
, LogPrefix(TStringBuilder() << Self.LogPrefix << "TParserHandler: ")
, ParerSchema(parerSchema)
, LogPrefix(TStringBuilder() << Self.LogPrefix << "TParserHandler: ")
{}

public:
Expand All @@ -67,7 +67,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
LOG_ROW_DISPATCHER_TRACE("Got parsed data, number rows: " << numberRows);

Self.ParsedData.assign(ParerSchema.size(), nullptr);
for (ui64 i = 0; i < ParerSchema.size(); ++i) {
for (size_t i = 0; i < ParerSchema.size(); ++i) {
auto columnStatus = Self.Parser->GetParsedColumn(i);
if (Y_LIKELY(columnStatus.IsSuccess())) {
Self.ParsedData[i] = columnStatus.GetValue();
Expand All @@ -77,14 +77,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

Self.Offsets = &Self.Parser->GetOffsets();

Self.FilterData(numberRows);
}

private:
void OnColumnError(ui64 columnIndex, TStatus status) {
const auto& column = ParerSchema[columnIndex];
LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", status: " << status.ToString());
LOG_ROW_DISPATCHER_WARN("Failed to parse column " << column.ToString() << ", " << status.ToString());

const auto columnIt = Self.ColumnsDesc.find(column.Name);
if (columnIt == Self.ColumnsDesc.end()) {
Expand All @@ -101,8 +100,8 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

private:
TTopicFormatHandler& Self;
const TString LogPrefix;
const TVector<TSchemaColumn> ParerSchema;
const TString LogPrefix;
};

class TClientHandler : public IFilteredDataConsumer {
Expand All @@ -113,8 +112,8 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
TClientHandler(TTopicFormatHandler& self, IClientDataConsumer::TPtr client)
: Self(self)
, Client(client)
, LogPrefix(TStringBuilder() << Self.LogPrefix << "TClientHandler " << Client->GetClientId() << ": ")
, Columns(Client->GetColumns())
, LogPrefix(TStringBuilder() << Self.LogPrefix << "TClientHandler " << Client->GetClientId() << ": ")
, FilteredRow(Columns.size())
{
ColumnsIds.reserve(Columns.size());
Expand All @@ -124,6 +123,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
return Client;
}

bool IsClientStarted() const {
return ClientStarted;
}

TStatus SetupColumns() {
if (Columns.empty()) {
return TStatus(TStatus::EId::INTERNAL_ERROR, "Client should have at least one column in schema");
Expand Down Expand Up @@ -203,7 +206,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

void OnFilterStarted() override {
FilterCompiled = true;
ClientStarted = true;
Client->StartClientSession();
}

Expand Down Expand Up @@ -263,10 +266,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
private:
TTopicFormatHandler& Self;
const IClientDataConsumer::TPtr Client;
const TVector<TSchemaColumn> Columns;
const TString LogPrefix;
TVector<TSchemaColumn> Columns;

TVector<ui64> ColumnsIds;
bool FilterCompiled = false;
bool ClientStarted = false;

// Filtered data
ui64 DataPackerSize = 0;
Expand Down Expand Up @@ -499,8 +503,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

for (const auto& [_, client] : Clinets) {
LOG_ROW_DISPATCHER_TRACE("Commit client " << client->GetClient()->GetClientId() << " offset " << lastOffset);
client->GetClient()->UpdateClinetOffset(lastOffset);
if (client->IsClientStarted()) {
LOG_ROW_DISPATCHER_TRACE("Commit client " << client->GetClient()->GetClientId() << " offset " << lastOffset);
client->GetClient()->UpdateClinetOffset(lastOffset);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class IClientDataConsumer : public TThrRefBase {
virtual NActors::TActorId GetClientId() const = 0;
virtual TMaybe<ui64> GetNextMessageOffset() const = 0;

// After error client moves into invalid state and should be removed
virtual void OnClientError(TStatus status) = 0;

virtual void StartClientSession() = 0;
Expand Down

0 comments on commit bcaf4e0

Please sign in to comment.