Skip to content

Commit

Permalink
Fixed typo
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 10, 2024
1 parent 7a98cad commit 3936113
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
32 changes: 16 additions & 16 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

for (const auto clientId : columnIt->second.Clients) {
const auto clientIt = Self.Clinets.find(clientId);
if (clientIt != Self.Clinets.end()) {
const auto clientIt = Self.Clients.find(clientId);
if (clientIt != Self.Clients.end()) {
clientIt->second->OnClientError(status);
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Counters.ActiveClients->Set(0);

with_lock(Alloc) {
Clinets.clear();
Clients.clear();
}
}

Expand Down Expand Up @@ -327,7 +327,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public

void Handle(NActors::TEvents::TEvPoison::TPtr&) {
with_lock(Alloc) {
Clinets.clear();
Clients.clear();
}
PassAway();
}
Expand All @@ -344,14 +344,14 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
if (Parser) {
Parser->ParseMessages(messages);
ScheduleRefresh();
} else if (!Clinets.empty()) {
} else if (!Clients.empty()) {
FatalError(TStatus::Fail(EStatusId::INTERNAL_ERROR, "Failed to parse messages, expected empty clients set without parser"));
}
}

TQueue<std::pair<TRope, TVector<ui64>>> ExtractClientData(NActors::TActorId clientId) override {
const auto it = Clinets.find(clientId);
if (it == Clinets.end()) {
const auto it = Clients.find(clientId);
if (it == Clients.end()) {
return {};
}
return it->second->ExtractClientData();
Expand All @@ -361,7 +361,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
LOG_ROW_DISPATCHER_DEBUG("Add client with id " << client->GetClientId());

auto clientHandler = MakeIntrusive<TClientHandler>(*this, client);
if (!Clinets.emplace(client->GetClientId(), clientHandler).second) {
if (!Clients.emplace(client->GetClientId(), clientHandler).second) {
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new client, client with id " << client->GetClientId() << " already exists");
}
Counters.ActiveClients->Inc();
Expand Down Expand Up @@ -392,14 +392,14 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Filters->RemoveFilter(clientId);
}

const auto it = Clinets.find(clientId);
if (it == Clinets.end()) {
const auto it = Clients.find(clientId);
if (it == Clients.end()) {
return;
}

const auto client = it->second->GetClient();
Counters.ActiveClients->Dec();
Clinets.erase(it);
Clients.erase(it);

for (const auto& column : client->GetColumns()) {
const auto columnIt = ColumnsDesc.find(column.Name);
Expand All @@ -420,7 +420,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

bool HasClients() const override {
return !Clinets.empty();
return !Clients.empty();
}

TFormatHandlerStatistic GetStatistics() override {
Expand Down Expand Up @@ -514,17 +514,17 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Filters->FilterData(ParserSchemaIndex, *Offsets, ParsedData, numberRows);
}

for (const auto& [_, client] : Clinets) {
for (const auto& [_, client] : Clients) {
if (client->IsClientStarted()) {
LOG_ROW_DISPATCHER_TRACE("Commit client " << client->GetClient()->GetClientId() << " offset " << lastOffset);
client->GetClient()->UpdateClinetOffset(lastOffset);
client->GetClient()->UpdateClientOffset(lastOffset);
}
}
}

void FatalError(TStatus status) const {
LOG_ROW_DISPATCHER_ERROR("Got fatal error: " << status.GetErrorMessage());
for (const auto& [_, client] : Clinets) {
for (const auto& [_, client] : Clients) {
client->OnClientError(status);
}
}
Expand All @@ -539,7 +539,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
TVector<ui64> FreeColumnIds;
TVector<ui64> ParserSchemaIndex; // Column id to index in parser schema
std::map<TString, TColumnDesc> ColumnsDesc;
std::unordered_map<NActors::TActorId, TClientHandler::TPtr> Clinets;
std::unordered_map<NActors::TActorId, TClientHandler::TPtr> Clients;

// Perser and filters
ITopicParser::TPtr Parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class IClientDataConsumer : public TThrRefBase {

virtual void StartClientSession() = 0;
virtual void AddDataToClient(ui64 offset, ui64 rowSize) = 0;
virtual void UpdateClinetOffset(ui64 offset) = 0;
virtual void UpdateClientOffset(ui64 offset) = 0;
};

class ITopicFormatHandler : public TNonCopyable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class TFormatHadlerFixture : public TBaseFixture {
HasData = true;
}

void UpdateClinetOffset(ui64 offset) override {
void UpdateClientOffset(ui64 offset) override {
UNIT_ASSERT_C(Started, "Unexpected offset for not started session");
UNIT_ASSERT_C(!ExpectedError, "Error is not handled: " << ExpectedError->second << ", client id: " << ClientId);
UNIT_ASSERT_C(!Offsets.empty(), "Unexpected message batch, offset: " << offset << ", client id: " << ClientId);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
Self.SendDataArrived(*this);
}

void UpdateClinetOffset(ui64 offset) override {
LOG_ROW_DISPATCHER_TRACE("UpdateClinetOffset for " << ReadActorId << ", new offset: " << offset);
void UpdateClientOffset(ui64 offset) override {
LOG_ROW_DISPATCHER_TRACE("UpdateClientOffset for " << ReadActorId << ", new offset: " << offset);
if (!NextMessageOffset || *NextMessageOffset < offset + 1) {
NextMessageOffset = offset + 1;
}
Expand Down

0 comments on commit 3936113

Please sign in to comment.