Skip to content

Commit

Permalink
Fixed SetupColumns
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Dec 7, 2024
1 parent bcaf4e0 commit b70dbd5
Showing 1 changed file with 6 additions and 12 deletions.
18 changes: 6 additions & 12 deletions ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,9 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
return TStatus(TStatus::EId::INTERNAL_ERROR, "Client should have at least one column in schema");
}

std::map<TString, TColumnDesc> newParserColumns = Self.ColumnsDesc;
for (const auto& column : Columns) {
const auto it = newParserColumns.find(column.Name);
if (it != newParserColumns.end()) {
const auto it = Self.ColumnsDesc.find(column.Name);
if (it != Self.ColumnsDesc.end()) {
if (it->second.TypeYson != column.TypeYson) {
return TStatus(TStatus::EId::SCHEME_ERROR, TStringBuilder() << "Use the same column type in all queries via RD, current type for column `" << column.Name << "` is " << it->second.TypeYson << " (requested type is " << column.TypeYson <<")");
}
Expand All @@ -149,18 +148,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
Self.FreeColumnIds.pop_back();
}

newParserColumns[column.Name] = TColumnDesc{.ColumnId = columnId, .TypeYson = column.TypeYson, .Clients = {Client->GetClientId()}};
Self.ColumnsDesc[column.Name] = TColumnDesc{.ColumnId = columnId, .TypeYson = column.TypeYson, .Clients = {Client->GetClientId()}};
ColumnsIds.emplace_back(columnId);
Self.MaxColumnId = std::max(Self.MaxColumnId, columnId + 1);
}
}

auto status = SetupPacker();
if (status.IsSuccess()) {
Self.ColumnsDesc.swap(newParserColumns);
}

return status;
return SetupPacker();
}

TQueue<std::pair<TRope, TVector<ui64>>> ExtractClientData() {
Expand Down Expand Up @@ -356,6 +350,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
if (!Clinets.insert({client->GetClientId(), clientHandler}).second) {
return TStatus(TStatus::EId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new client, client with id " << client->GetClientId() << " already exists");
}
Counters.ActiveClients->Inc();

if (auto status = clientHandler->SetupColumns()) {
RemoveClient(client->GetClientId());
Expand All @@ -373,7 +368,6 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
return status.AddParentIssue("Failed to create filter for new client");
}

Counters.ActiveClients->Inc();
return TStatus();
}

Expand All @@ -390,8 +384,8 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
}

const auto client = it->second->GetClient();
Clinets.erase(it);
Counters.ActiveClients->Dec();
Clinets.erase(it);

for (const auto& column : client->GetColumns()) {
const auto columnIt = ColumnsDesc.find(column.Name);
Expand Down

0 comments on commit b70dbd5

Please sign in to comment.