diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 90ca0f90763b..6e446d164a1d 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -40,10 +40,10 @@ NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableI TStringBuilder message; message << "Transaction locks invalidated."; - if (auto it = shardIdToTableInfo.find(shardId); it != std::end(shardIdToTableInfo)) { + if (auto tableInfoPtr = shardIdToTableInfo.GetPtr(shardId); tableInfoPtr) { message << " Tables: "; bool first = true; - for (const auto& path : it->second.Pathes) { + for (const auto& path : tableInfoPtr->Pathes) { if (!first) { message << ", "; first = false; diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index d5b2d7173ba7..23dc0069576b 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -123,10 +123,46 @@ struct TDeferredEffects { struct TTableInfo { bool IsOlap = false; - THashSet Pathes; + THashSet Pathes; }; -using TShardIdToTableInfo = THashMap; + +class TShardIdToTableInfo { +public: + const TTableInfo& Get(ui64 shardId) const { + const auto* result = GetPtr(shardId); + AFL_ENSURE(result); + return *result; + } + + const TTableInfo* GetPtr(ui64 shardId) const { + auto it = ShardIdToInfo.find(shardId); + return it != std::end(ShardIdToInfo) + ? &it->second + : nullptr; + } + + void Add(ui64 shardId, bool isOlap, const TString& path) { + const auto [stringsIter, _] = Strings.insert(path); + const TStringBuf pathBuf = *stringsIter; + auto infoIter = ShardIdToInfo.find(shardId); + if (infoIter != std::end(ShardIdToInfo)) { + AFL_ENSURE(infoIter->second.IsOlap == isOlap); + infoIter->second.Pathes.insert(pathBuf); + } else { + ShardIdToInfo.emplace( + shardId, + TTableInfo{ + .IsOlap = isOlap, + .Pathes = {pathBuf}, + }); + } + } + +private: + THashMap ShardIdToInfo; + std::unordered_set Strings;// Pointers aren't invalidated. +}; using TShardIdToTableInfoPtr = std::shared_ptr; class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index d139ae51e153..f129d4adfb3a 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -224,9 +224,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAdd(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath); } } else if (data.GetData().template Is()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; @@ -236,9 +234,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAdd(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath); } } }; @@ -1980,9 +1976,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseAdd(task.Meta.ShardId, stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath); } else if (stageInfo.Meta.IsSysView()) { computeTasks.emplace_back(task.Id); } else { @@ -2392,7 +2386,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseat(shardId).IsOlap) { + if (HtapTx && ShardIdToTableInfo->Get(shardId).IsOlap) { receivingColumnShardsSet.insert(shardId); } }