Skip to content

Commit

Permalink
Smaller memory footprint for shards info (ydb-platform#9207)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 13, 2024
1 parent 96cb4ca commit 1960e5f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 14 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableI
TStringBuilder message;
message << "Transaction locks invalidated.";

if (auto it = shardIdToTableInfo.find(shardId); it != std::end(shardIdToTableInfo)) {
if (auto tableInfoPtr = shardIdToTableInfo.GetPtr(shardId); tableInfoPtr) {
message << " Tables: ";
bool first = true;
for (const auto& path : it->second.Pathes) {
for (const auto& path : tableInfoPtr->Pathes) {
if (!first) {
message << ", ";
first = false;
Expand Down
40 changes: 38 additions & 2 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,46 @@ struct TDeferredEffects {

struct TTableInfo {
bool IsOlap = false;
THashSet<TString> Pathes;
THashSet<TStringBuf> Pathes;
};

using TShardIdToTableInfo = THashMap<ui64, TTableInfo>;

class TShardIdToTableInfo {
public:
const TTableInfo& Get(ui64 shardId) const {
const auto* result = GetPtr(shardId);
AFL_ENSURE(result);
return *result;
}

const TTableInfo* GetPtr(ui64 shardId) const {
auto it = ShardIdToInfo.find(shardId);
return it != std::end(ShardIdToInfo)
? &it->second
: nullptr;
}

void Add(ui64 shardId, bool isOlap, const TString& path) {
const auto [stringsIter, _] = Strings.insert(path);
const TStringBuf pathBuf = *stringsIter;
auto infoIter = ShardIdToInfo.find(shardId);
if (infoIter != std::end(ShardIdToInfo)) {
AFL_ENSURE(infoIter->second.IsOlap == isOlap);
infoIter->second.Pathes.insert(pathBuf);
} else {
ShardIdToInfo.emplace(
shardId,
TTableInfo{
.IsOlap = isOlap,
.Pathes = {pathBuf},
});
}
}

private:
THashMap<ui64, TTableInfo> ShardIdToInfo;
std::unordered_set<TString> Strings;// Pointers aren't invalidated.
};
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;

class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
Expand Down
14 changes: 4 additions & 10 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

const auto& task = TasksGraph.GetTask(taskId);
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Pathes.insert(stageInfo.Meta.TablePath);
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
}
} else if (data.GetData().template Is<NKikimrKqp::TEvKqpOutputActorResultInfo>()) {
NKikimrKqp::TEvKqpOutputActorResultInfo info;
Expand All @@ -236,9 +234,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

const auto& task = TasksGraph.GetTask(taskId);
const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Pathes.insert(stageInfo.Meta.TablePath);
ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
}
}
};
Expand Down Expand Up @@ -1980,9 +1976,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
NYql::NDqProto::TDqTask* protoTask = ArenaSerializeTaskToProto(TasksGraph, task, true);
datashardTasks[task.Meta.ShardId].emplace_back(protoTask);

auto& info = (*ShardIdToTableInfo)[task.Meta.ShardId];
info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap);
info.Pathes.insert(stageInfo.Meta.TablePath);
ShardIdToTableInfo->Add(task.Meta.ShardId, stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath);
} else if (stageInfo.Meta.IsSysView()) {
computeTasks.emplace_back(task.Id);
} else {
Expand Down Expand Up @@ -2392,7 +2386,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Effects are only applied when all locks are valid
receivingShardsSet.insert(shardId);

if (HtapTx && ShardIdToTableInfo->at(shardId).IsOlap) {
if (HtapTx && ShardIdToTableInfo->Get(shardId).IsOlap) {
receivingColumnShardsSet.insert(shardId);
}
}
Expand Down

0 comments on commit 1960e5f

Please sign in to comment.