From c3a77e321874801594040735293a560b245502da Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 25 Jan 2024 13:36:48 +0500 Subject: [PATCH] Choose partition for topic split/merge (#1268) --- ydb/core/kafka_proxy/ut/ya.make | 2 +- ydb/core/persqueue/events/internal.h | 18 +- ydb/core/persqueue/partition.cpp | 21 +- ydb/core/persqueue/partition.h | 5 +- ydb/core/persqueue/partition_init.cpp | 2 +- .../persqueue/partition_sourcemanager.cpp | 12 +- ydb/core/persqueue/partition_sourcemanager.h | 6 +- ydb/core/persqueue/partition_write.cpp | 23 +- ydb/core/persqueue/pq_impl.cpp | 35 +- ydb/core/persqueue/pq_impl.h | 4 + ydb/core/persqueue/transaction.cpp | 9 +- .../persqueue/ut/partition_chooser_ut.cpp | 486 +++++++++++++++--- ydb/core/persqueue/ut/partition_ut.cpp | 2 +- ydb/core/persqueue/ut/partitiongraph_ut.cpp | 72 ++- ydb/core/persqueue/ut/pqtablet_mock.h | 1 + ydb/core/persqueue/utils.cpp | 83 ++- ydb/core/persqueue/utils.h | 13 +- ydb/core/persqueue/writer/common.h | 34 ++ .../writer/metadata_initializers.cpp | 14 + ydb/core/persqueue/writer/partition_chooser.h | 9 +- .../writer/partition_chooser_impl.cpp | 429 +--------------- .../persqueue/writer/partition_chooser_impl.h | 198 ++----- ...ion_chooser_impl__abstract_chooser_actor.h | 368 +++++++++++++ ...artition_chooser_impl__old_chooser_actor.h | 163 ++++++ ...partition_chooser_impl__partition_helper.h | 86 ++++ .../partition_chooser_impl__pqrb_helper.h | 65 +++ ...partition_chooser_impl__sm_chooser_actor.h | 272 ++++++++++ .../partition_chooser_impl__table_helper.h | 279 ++++++++++ ydb/core/persqueue/writer/pipe_utils.h | 76 +++ .../persqueue/writer/source_id_encoding.cpp | 63 ++- .../persqueue/writer/source_id_encoding.h | 2 + ydb/core/persqueue/writer/writer.cpp | 26 +- ydb/core/protos/msgbus_pq.proto | 3 + ydb/core/protos/pqconfig.proto | 7 + ydb/core/testlib/basics/ya.make | 3 + ydb/core/testlib/test_pq_client.h | 1 + ydb/core/tx/columnshard/splitter/ut/ya.make | 3 + 37 files changed, 2134 insertions(+), 761 deletions(-) create mode 100644 ydb/core/persqueue/writer/common.h create mode 100644 ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h create mode 100644 ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h create mode 100644 ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h create mode 100644 ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h create mode 100644 ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h create mode 100644 ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h create mode 100644 ydb/core/persqueue/writer/pipe_utils.h diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make index 91266be62c80..361a5117a612 100644 --- a/ydb/core/kafka_proxy/ut/ya.make +++ b/ydb/core/kafka_proxy/ut/ya.make @@ -1,6 +1,6 @@ UNITTEST_FOR(ydb/core/kafka_proxy) -#SIZE(medium) +SIZE(medium) SRCS( ut_kafka_functions.cpp diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index 428f3e9b360b..0bc98a1d5753 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -169,6 +169,8 @@ struct TEvPQ { EvCacheProxyForgetRead, EvGetFullDirectReadData, EvProvideDirectReadInfo, + EvCheckPartitionStatusRequest, + EvCheckPartitionStatusResponse, EvEnd }; @@ -491,12 +493,13 @@ struct TEvPQ { }; struct TEvChangeOwner : public TEventLocal { - explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force) + explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force, const bool registerIfNotExists = true) : Cookie(cookie) , Owner(owner) , PipeClient(pipeClient) , Sender(sender) , Force(force) + , RegisterIfNotExists(registerIfNotExists) {} ui64 Cookie; @@ -504,6 +507,7 @@ struct TEvPQ { TActorId PipeClient; TActorId Sender; bool Force; + bool RegisterIfNotExists; }; struct TEvPipeDisconnected : public TEventLocal { @@ -989,6 +993,18 @@ struct TEvPQ { struct TEvProvideDirectReadInfo : public TEventLocal { }; + struct TEvCheckPartitionStatusRequest : public TEventPB { + TEvCheckPartitionStatusRequest() = default; + + TEvCheckPartitionStatusRequest(ui32 partitionId) { + Record.SetPartition(partitionId); + } + }; + + struct TEvCheckPartitionStatusResponse : public TEventPB { + }; + + }; } //NKikimr diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index b996667d5386..b73c10d7ab11 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1796,7 +1796,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf { Config = config; PartitionConfig = GetPartitionConfig(Config, Partition); - PartitionGraph.Rebuild(Config); + PartitionGraph = MakePartitionGraph(Config); TopicConverter = topicConverter; NewPartition = false; @@ -2616,4 +2616,23 @@ void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext Send(ev->Sender, response.Release()); } +void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; + + if (Partition != record.GetPartition()) { + LOG_INFO_S( + ctx, NKikimrServices::PERSQUEUE, + "TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." << + " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition << "." + ); + return; + } + + auto response = MakeHolder(); + response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active); + + Send(ev->Sender, response.Release()); +} + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index dd0fa7e36eae..264b78275a8a 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -100,7 +100,7 @@ class TPartition : public TActorBootstrapped { void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp); void ReplyOk(const TActorContext& ctx, const ui64 dst); - void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie); + void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo); void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime); @@ -344,6 +344,7 @@ class TPartition : public TActorBootstrapped { // void DestroyReadSession(const TReadSessionKey& key); void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); TString LogPrefix() const; @@ -481,6 +482,7 @@ class TPartition : public TActorBootstrapped { HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); + HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle); HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); @@ -538,6 +540,7 @@ class TPartition : public TActorBootstrapped { HFuncTraced(TEvPQ::TEvTxRollback, Handle); HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle); HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle); + HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle); HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle); HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle); HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 913812a06346..fcab0f62f360 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -180,7 +180,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon case NKikimrProto::NODATA: Partition()->Config = Partition()->TabletConfig; Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition); - Partition()->PartitionGraph.Rebuild(Partition()->Config); + Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config); break; case NKikimrProto::ERROR: diff --git a/ydb/core/persqueue/partition_sourcemanager.cpp b/ydb/core/persqueue/partition_sourcemanager.cpp index 0e8f6d781274..ac5eff21be56 100644 --- a/ydb/core/persqueue/partition_sourcemanager.cpp +++ b/ydb/core/persqueue/partition_sourcemanager.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NPQ { IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId); -bool IsResearchRequires(std::optional node); +bool IsResearchRequires(const TPartitionGraph::Node* node); // // TPartitionSourceManager @@ -37,7 +37,7 @@ void TPartitionSourceManager::ScheduleBatch() { PendingSourceIds = std::move(UnknownSourceIds); - for(const auto* parent : node.value()->HierarhicalParents) { + for(const auto* parent : node->HierarhicalParents) { PendingCookies.insert(++Cookie); TActorId actorId = PartitionRequester(parent->Id, parent->TabletId); @@ -141,7 +141,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const } } -TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const { +const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const { return Partition.PartitionGraph.GetPartition(Partition.Partition); } @@ -185,7 +185,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const { bool TPartitionSourceManager::HasParents() const { auto node = Partition.PartitionGraph.GetPartition(Partition.Partition); - return node && !node.value()->Parents.empty(); + return node && !node->Parents.empty(); } TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) { @@ -484,8 +484,8 @@ IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId p return new TSourceIdRequester(parent, partition, tabletId); } -bool IsResearchRequires(std::optional node) { - return node && !node.value()->Parents.empty(); +bool IsResearchRequires(const TPartitionGraph::Node* node) { + return node && !node->Parents.empty(); } NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) { diff --git a/ydb/core/persqueue/partition_sourcemanager.h b/ydb/core/persqueue/partition_sourcemanager.h index a05a51aa9700..6a304e04bf59 100644 --- a/ydb/core/persqueue/partition_sourcemanager.h +++ b/ydb/core/persqueue/partition_sourcemanager.h @@ -12,7 +12,7 @@ class TPartition; class TPartitionSourceManager { private: - using TPartitionNode = std::optional; + using TPartitionNode = TPartitionGraph::Node; public: using TPartitionId = ui32; @@ -96,7 +96,7 @@ class TPartitionSourceManager { private: TPartitionSourceManager& Manager; - TPartitionNode Node; + const TPartitionNode* Node; TSourceIdWriter SourceIdWriter; THeartbeatEmitter HeartbeatEmitter; }; @@ -125,7 +125,7 @@ class TPartitionSourceManager { void FinishBatch(const TActorContext& ctx); bool RequireEnqueue(const TString& sourceId); - TPartitionNode GetPartitionNode() const; + const TPartitionNode* GetPartitionNode() const; TSourceIdStorage& GetSourceIdStorage() const; bool HasParents() const; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 164e21643b99..4c3a78ea0845 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -31,14 +31,18 @@ static const ui32 MAX_INLINE_SIZE = 1000; static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL; -void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) { +void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition); THolder response = MakeHolder(dst); NKikimrClient::TResponse& resp = *response->Response; resp.SetStatus(NMsgBusProxy::MSTATUS_OK); resp.SetErrorCode(NPersQueue::NErrorCode::OK); - resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie); + auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult(); + r->SetOwnerCookie(cookie); + r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active); + r->SetSeqNo(seqNo); + ctx.Send(Tablet, response.Release()); } @@ -146,8 +150,12 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr ev, c auto &owner = ev->Owner; auto it = Owners.find(owner); if (it == Owners.end()) { - Owners[owner]; - it = Owners.find(owner); + if (ev->RegisterIfNotExists) { + Owners[owner]; + it = Owners.find(owner); + } else { + return ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId isn't registered"); + } } if (it->second.NeedResetOwner || ev->Force) { //change owner Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize); @@ -346,10 +354,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) { if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion) ++offset; } else if (response.IsOwnership()) { - const TString& ownerCookie = response.GetOwnership().OwnerCookie; + const auto& r = response.GetOwnership(); + const TString& ownerCookie = r.OwnerCookie; auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie)); if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) { - ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie); + auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(it->first)); + auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds().end() ? 0 : sit->second.SeqNo; + ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, seqNo); } else { ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already"); } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index a26eef66028a..aa4653b99fd3 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1230,6 +1230,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c } ProcessSourceIdRequests(partitionId); + ProcessCheckPartitionStatusRequests(partitionId); if (allInitialized) { SourceIdRequests.clear(); } @@ -2048,7 +2049,8 @@ void TPersQueue::HandleGetOwnershipRequest(const ui64 responseCookie, const TAct it->second = TPipeInfo::ForOwner(partActor, owner, it->second.ServerActors); InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_GET_OWNERSHIP); - THolder event = MakeHolder(responseCookie, owner, pipeClient, sender, req.GetCmdGetOwnership().GetForce()); + THolder event = MakeHolder(responseCookie, owner, pipeClient, sender, + req.GetCmdGetOwnership().GetForce(), req.GetCmdGetOwnership().GetRegisterIfNotExists()); ctx.Send(partActor, event.Release()); } @@ -3915,6 +3917,37 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) { } } +void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; + auto it = Partitions.find(record.GetPartition()); + if (it == Partitions.end()) { + LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition()); + + auto response = THolder(); + response->Record.SetStatus(NKikimrPQ::ETopicPartitionStatus::Deleted); + Send(ev->Sender, response.Release()); + + return; + } + + if (it->second.InitDone) { + Forward(ev, it->second.Actor); + } else { + CheckPartitionStatusRequests[record.GetPartition()].push_back(ev); + } +} + +void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) { + auto sit = CheckPartitionStatusRequests.find(partitionId); + if (sit != CheckPartitionStatusRequests.end()) { + auto it = Partitions.find(partitionId); + for (auto& r : sit->second) { + Forward(r, it->second.Actor); + } + CheckPartitionStatusRequests.erase(partitionId); + } +} + TString TPersQueue::LogPrefix() const { return TStringBuilder() << SelfId() << " "; } diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index dd78e89add25..61eabfaa0b78 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -168,6 +168,9 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx); void ProcessSourceIdRequests(ui32 partitionId); + void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); + void ProcessCheckPartitionStatusRequests(ui32 partitionId); + TString LogPrefix() const; static constexpr const char * KeyConfig() { return "_config"; } @@ -405,6 +408,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { bool UseMediatorTimeCast = true; THashMap> SourceIdRequests; + THashMap> CheckPartitionStatusRequests; TMaybe TabletGeneration; }; diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index 86a1839e4b87..b613822c8e41 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -147,8 +147,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans TabletConfig = txBody.GetTabletConfig(); BootstrapConfig = txBody.GetBootstrapConfig(); - TPartitionGraph graph; - graph.Rebuild(TabletConfig); + TPartitionGraph graph = MakePartitionGraph(TabletConfig); for (const auto& p : TabletConfig.GetPartitions()) { auto node = graph.GetPartition(p.GetPartitionId()); @@ -156,15 +155,15 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans // Old configuration format without AllPartitions. Split/Merge is not supported. continue; } - if (node.value()->Children.empty()) { - for (const auto* r : node.value()->Parents) { + if (node->Children.empty()) { + for (const auto* r : node->Parents) { if (extractTabletId != r->TabletId) { Senders.insert(r->TabletId); } } } - for (const auto* r : node.value()->Children) { + for (const auto* r : node->Children) { if (r->Children.empty()) { if (extractTabletId != r->TabletId) { Receivers.insert(r->TabletId); diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp index e43f99ad85f2..a66e345dd69a 100644 --- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp +++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp @@ -5,17 +5,43 @@ #include #include +#include using namespace NKikimr::NPQ; static constexpr bool SMEnabled = true; static constexpr bool SMDisabled = false; -NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled) { - Cerr << ">>>>> SplitMergeEnabled=" << SplitMergeEnabled << Endl; +using namespace NKikimr; +using namespace NActors; +using namespace NKikimrPQ; + +void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf, + ui32 id, + const std::optional&& boundaryFrom, + const std::optional&& boundaryTo, + std::vector children = {}) { + auto* p = conf.AddPartitions(); + p->SetPartitionId(id); + p->SetTabletId(1000 + id); + p->SetStatus(children.empty() ? NKikimrPQ::ETopicPartitionStatus::Active : NKikimrPQ::ETopicPartitionStatus::Inactive); + if (boundaryFrom) { + p->MutableKeyRange()->SetFromBound(boundaryFrom.value()); + } + if (boundaryTo) { + p->MutableKeyRange()->SetToBound(boundaryTo.value()); + } + for(ui32 c : children) { + p->AddChildPartitionIds(c); + } +} + +NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig0(bool SplitMergeEnabled) { NKikimrSchemeOp::TPersQueueGroupDescription result; NKikimrPQ::TPQTabletConfig* config = result.MutablePQTabletConfig(); + result.SetBalancerTabletID(999); + auto* partitionStrategy = config->MutablePartitionStrategy(); partitionStrategy->SetMinPartitionCount(3); partitionStrategy->SetMaxPartitionCount(SplitMergeEnabled ? 10 : 3); @@ -23,27 +49,17 @@ NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled) config->SetTopicName("/Root/topic-1"); config->SetTopicPath("/Root"); - auto* p0 = result.AddPartitions(); - p0->SetPartitionId(0); - p0->SetTabletId(1000); - p0->MutableKeyRange()->SetToBound("C"); - - auto* p1 = result.AddPartitions(); - p1->SetPartitionId(1); - p1->SetTabletId(1001); - p1->MutableKeyRange()->SetFromBound("C"); - p1->MutableKeyRange()->SetToBound("F"); + return result; +} - auto* p2 = result.AddPartitions(); - p2->SetPartitionId(2); - p2->SetTabletId(1002); - p2->MutableKeyRange()->SetFromBound("F"); +NKikimrSchemeOp::TPersQueueGroupDescription CreateConfig(bool SplitMergeEnabled) { + NKikimrSchemeOp::TPersQueueGroupDescription result = CreateConfig0(SplitMergeEnabled); - auto* p3 = result.AddPartitions(); - p3->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Inactive); - p3->SetPartitionId(3); - p3->SetTabletId(1003); - p3->MutableKeyRange()->SetFromBound("D"); + AddPartition(result, 0, {}, "C"); + AddPartition(result, 1, "C", "F"); + AddPartition(result, 2, "F", "Z"); + AddPartition(result, 3, "Z", {}, {4}); + AddPartition(result, 4, "Z", {}); return result; } @@ -172,14 +188,17 @@ NPersQueue::TTopicConverterPtr CreateTopicConverter() { return NPersQueue::TTopicNameConverter::ForFirstClass(CreateConfig(SMDisabled).GetPQTabletConfig()); } -TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, bool spliMergeEnabled, const TString& sourceId, std::optional preferedPartition = std::nullopt) { - NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter(); +TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + const TString& sourceId, + std::optional preferedPartition = std::nullopt) { + NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter(); TWriteSessionMock* mock = new TWriteSessionMock(); NActors::TActorId parentId = server.GetRuntime()->Register(mock); - server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActor(parentId, - CreateConfig(spliMergeEnabled), + server.GetRuntime()->Register(NKikimr::NPQ::CreatePartitionChooserActorM(parentId, + config, fullConverter, sourceId, preferedPartition, @@ -188,15 +207,55 @@ TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, bool spliMer mock->Promise.GetFuture().GetValueSync(); return mock; + +} + +TWriteSessionMock* ChoosePartition(NPersQueue::TTestServer& server, bool spliMergeEnabled, const TString& sourceId, std::optional preferedPartition = std::nullopt) { + auto config = CreateConfig(spliMergeEnabled); + return ChoosePartition(server, config, sourceId, preferedPartition); } -void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId) { +void InitTable(NPersQueue::TTestServer& server) { + class Initializer: public TActorBootstrapped { + public: + Initializer(NThreading::TPromise& promise) + : Promise(promise) {} + + void Bootstrap(const TActorContext& ctx) { + Become(&TThis::StateWork); + ctx.Send( + NKikimr::NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), + new NKikimr::NMetadata::NProvider::TEvPrepareManager(NKikimr::NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) + ); + } + + private: + void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext&) { + Promise.SetValue(); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); + } + } + + private: + NThreading::TPromise& Promise; + }; + + NThreading::TPromise promise = NThreading::NewPromise(); + server.GetRuntime()->Register(new Initializer(promise)); + promise.GetFuture().GetValueSync(); +} + +void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId, ui64 seqNo = 0) { + InitTable(server); + const auto& pqConfig = server.CleverServer->GetRuntime()->GetAppData().PQConfig; auto tableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping : ESourceIdTableGeneration::SrcIdMeta2; - Cerr << ">>>>> pqConfig.GetTopicsAreFirstClassCitizen()=" << pqConfig.GetTopicsAreFirstClassCitizen() << Endl; - NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter(); NKikimr::NPQ::NSourceIdEncoding::TEncodedSourceId encoded = NSourceIdEncoding::EncodeSrcId( fullConverter->GetTopicForSrcIdHash(), sourceId, tableGeneration @@ -205,64 +264,361 @@ void WriteToTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 TString query; if (pqConfig.GetTopicsAreFirstClassCitizen()) { query = TStringBuilder() << "--!syntax_v1\n" - "UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES " + "UPSERT INTO `//Root/.metadata/TopicPartitionsMapping` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES " "(" << encoded.KeysHash << ", \"" << fullConverter->GetClientsideName() << "\", \"" << encoded.EscapedSourceId << "\", "<< TInstant::Now().MilliSeconds() << ", " - << TInstant::Now().MilliSeconds() << ", " << partitionId << ");"; + << TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << ");"; } else { query = TStringBuilder() << "--!syntax_v1\n" - "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES (" + "UPSERT INTO `/Root/PQ/SourceIdMeta2` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES (" << encoded.Hash << ", \"" << fullConverter->GetClientsideName() << "\", \"" << encoded.EscapedSourceId << "\", " - << TInstant::Now().MilliSeconds() << ", " << TInstant::Now().MilliSeconds() << ", " << partitionId << "); "; + << TInstant::Now().MilliSeconds() << ", " << TInstant::Now().MilliSeconds() << ", " << partitionId << ", " << seqNo << "); "; } Cerr << "Run query:\n" << query << Endl; auto scResult = server.AnnoyingClient->RunYqlDataQuery(query); } -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_Test) { - NPersQueue::TTestServer server{}; +TMaybe SelectTable(NPersQueue::TTestServer& server, const TString& sourceId) { + const auto& pqConfig = server.CleverServer->GetRuntime()->GetAppData().PQConfig; + auto tableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping + : ESourceIdTableGeneration::SrcIdMeta2; - { - auto r = ChoosePartition(server, SMEnabled, "A_Source"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - } - { - auto r = ChoosePartition(server, SMEnabled, "Y_Source"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); + NPersQueue::TTopicConverterPtr fullConverter = CreateTopicConverter(); + NKikimr::NPQ::NSourceIdEncoding::TEncodedSourceId encoded = NSourceIdEncoding::EncodeSrcId( + fullConverter->GetTopicForSrcIdHash(), sourceId, tableGeneration + ); + + TString query; + if (pqConfig.GetTopicsAreFirstClassCitizen()) { + query = TStringBuilder() << "--!syntax_v1\n" + "SELECT Partition, SeqNo " + "FROM `//Root/.metadata/TopicPartitionsMapping` " + "WHERE Hash = " << encoded.KeysHash << + " AND Topic = \"" << fullConverter->GetClientsideName() << "\"" << + " AND ProducerId = \"" << encoded.EscapedSourceId << "\""; + } else { + query = TStringBuilder() << "--!syntax_v1\n" + "SELECT Partition, SeqNo " + "FROM `/Root/PQ/SourceIdMeta2` " + "WHERE Hash = " << encoded.KeysHash << + " AND Topic = \"" << fullConverter->GetClientsideName() << "\"" << + " AND SourceId = \"" << encoded.EscapedSourceId << "\""; } - { - // Define partition for sourceId that is not in partition boundary - WriteToTable(server, "X_Source_w_0", 0); - auto r = ChoosePartition(server, SMEnabled, "X_Source_w_0"); - UNIT_ASSERT(r->Error); + Cerr << "Run query:\n" << query << Endl; + return server.AnnoyingClient->RunYqlDataQuery(query); +} + +void AssertTableEmpty(NPersQueue::TTestServer& server, const TString& sourceId) { + auto result = SelectTable(server, sourceId); + + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 0, "Table must not contains SourceId='" << sourceId << "'"); +} + +void AssertTable(NPersQueue::TTestServer& server, const TString& sourceId, ui32 partitionId, ui64 seqNo) { + auto result = SelectTable(server, sourceId); + + UNIT_ASSERT(result); + UNIT_ASSERT_VALUES_EQUAL_C(result->RowsCount(), 1, "Table must contains SourceId='" << sourceId << "'"); + + NYdb::TResultSetParser parser(*result); + UNIT_ASSERT(parser.TryNextRow()); + NYdb::TValueParser p(parser.GetValue(0)); + NYdb::TValueParser s(parser.GetValue(1)); + UNIT_ASSERT_VALUES_EQUAL(*p.GetOptionalUint32().Get(), partitionId); + UNIT_ASSERT_VALUES_EQUAL(*s.GetOptionalUint64().Get(), seqNo); +} + +class TPQTabletMock: public TActor { +public: + TPQTabletMock(ETopicPartitionStatus status, std::optional seqNo) + : TActor(&TThis::StateMockWork) + , Status(status) + , SeqNo(seqNo) { } - { - // Redefine partition for sourceId. Check that partition changed; - WriteToTable(server, "X_Source_w_0", 2); - auto r = ChoosePartition(server, SMEnabled, "X_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); + +private: + void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx) { + auto response = MakeHolder(); + + response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); + + if (ev->Get()->Record.GetPartitionRequest().HasCmdGetOwnership()) { + auto& o = ev->Get()->Record.GetPartitionRequest().GetCmdGetOwnership(); + if (o.GetRegisterIfNotExists() || SeqNo) { + auto* cmd = response->Record.MutablePartitionResponse()->MutableCmdGetOwnershipResult(); + cmd->SetOwnerCookie("ower_cookie"); + cmd->SetStatus(Status); + cmd->SetSeqNo(SeqNo.value_or(0)); + } else { + response->Record.SetErrorCode(NPersQueue::NErrorCode::SOURCEID_DELETED); + } + } + + auto* sn = response->Record.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo(); + sn->SetSeqNo(SeqNo.value_or(0)); + sn->SetState(SeqNo ? NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED : NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION); + + ctx.Send(ev->Sender, response.Release()); } - { - // Redefine partition for sourceId to inactive partition. Select new partition use partition boundary. - WriteToTable(server, "A_Source_w_0", 3); - auto r = ChoosePartition(server, SMEnabled, "A_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); + + void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) { + auto response = MakeHolder(); + + ctx.Send(ev->Sender, response.Release()); } - { - // Use prefered partition, but sourceId not in partition boundary - auto r = ChoosePartition(server, SMEnabled, "A_Source_1", 1); - UNIT_ASSERT(r->Error); + + STFUNC(StateMockWork) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(TEvPersQueue::TEvRequest, Handle); + HFunc(TEvPQ::TEvCheckPartitionStatusRequest, Handle); + } } + +private: + ETopicPartitionStatus Status; + std::optional SeqNo; +}; + + +TPQTabletMock* CreatePQTabletMock(NPersQueue::TTestServer& server, ui32 partitionId, ETopicPartitionStatus status, std::optional seqNo = std::nullopt) { + TPQTabletMock* mock = new TPQTabletMock(status, seqNo); + auto actorId = server.GetRuntime()->Register(mock); + NKikimr::NTabletPipe::NTest::TPipeMock::Register(partitionId + 1000, actorId); + return mock; } -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) { +NPersQueue::TTestServer CreateServer() { NPersQueue::TTestServer server{}; server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetTopicsAreFirstClassCitizen(true); server.CleverServer->GetRuntime()->GetAppData().PQConfig.SetUseSrcIdMetaMappingInFirstClass(true); + server.EnableLogs({NKikimrServices::PQ_PARTITION_CHOOSER}, NActors::NLog::PRI_TRACE); + + return server; +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_NewSourceId_Test) { + // We check the scenario when writing is performed with a new SourceID + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); + + auto r = ChoosePartition(server, config, "A_Source_0"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); + AssertTable(server, "A_Source_0", 0, 0); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryTrue_Test) { + // We check the partition selection scenario when we have already written with the + // specified SourceID, the partition to which we wrote is active, and the partition + // boundaries coincide with the distribution. + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F"); + AddPartition(config, 1, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + + WriteToTable(server, "A_Source_1", 0, 11); + auto r = ChoosePartition(server, config, "A_Source_1"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); + AssertTable(server, "A_Source_1", 0, 11); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionActive_BoundaryFalse_Test) { + // We check the partition selection scenario when we have already written with the + // specified SourceID, the partition to which we wrote is active, and the partition + // boundaries is not coincide with the distribution. + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F"); + AddPartition(config, 1, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + + WriteToTable(server, "A_Source_2", 1, 13); + auto r = ChoosePartition(server, config, "A_Source_2"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); + AssertTable(server, "A_Source_2", 1, 13); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionInactive_0_Test) { + // Boundary partition is inactive. It is configuration error - required reload of configuration. + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F"); + AddPartition(config, 1, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + + WriteToTable(server, "A_Source_3", 0, 13); + auto r = ChoosePartition(server, config, "A_Source_3"); + + UNIT_ASSERT(r->Error); + AssertTable(server, "A_Source_3", 0, 13); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionInactive_1_Test) { + // Boundary partition is inactive. It is configuration error - required reload of configuration. + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F"); + AddPartition(config, 1, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); // Active but not written + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Inactive); + + WriteToTable(server, "A_Source_4", 1, 13); + auto r = ChoosePartition(server, config, "A_Source_4"); + + UNIT_ASSERT(r->Error); + AssertTable(server, "A_Source_4", 1, 13); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_PartitionNotExists_Test) { + // Old partition alredy deleted. Choose new partition by boundary and save SeqNo + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 1, {}, "F"); + AddPartition(config, 2, "F", {}); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); + + WriteToTable(server, "A_Source_5", 0, 13); + auto r = ChoosePartition(server, config, "A_Source_5"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 13); + AssertTable(server, "A_Source_5", 1, 13); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_Test) { + // Old partition exists. Receive SeqNo from the partition. Choose new partition by boundary and save SeqNo + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, {}, {1, 2}); + AddPartition(config, 1, {}, "F"); + AddPartition(config, 2, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive, 157); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); + + WriteToTable(server, "A_Source_6", 0, 13); + auto r = ChoosePartition(server, config, "A_Source_6"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 157); + AssertTable(server, "A_Source_6", 1, 157); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_NotWritten_Test) { + // Old partition exists but not written. Choose new partition by boundary and save SeqNo + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, {}, {1, 2}); + AddPartition(config, 1, {}, "F"); + AddPartition(config, 2, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); + + WriteToTable(server, "A_Source_7", 0, 13); + auto r = ChoosePartition(server, config, "A_Source_7"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 13); + AssertTable(server, "A_Source_7", 1, 13); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_SourceId_OldPartitionExists_NotBoundary_Test) { + // Old partition exists. Receive SeqNo from the partition. Choose new partition from children and save SeqNo + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F", { 2}); + AddPartition(config, 1, "F", {}); + AddPartition(config, 2, {}, "F"); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive, 157); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); + + WriteToTable(server, "Y_Source_7", 0, 13); + auto r = ChoosePartition(server, config, "Y_Source_7"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->SeqNo, 157); + AssertTable(server, "Y_Source_7", 2, 157); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_Active_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F"); + AddPartition(config, 1, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + + auto r = ChoosePartition(server, config, "", 0); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveConfig_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, {}, {1}); + AddPartition(config, 1, {}, {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + + auto r = ChoosePartition(server, config, "", 0); + + UNIT_ASSERT(r->Error); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_InactiveActor_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(true); + AddPartition(config, 0, {}, "F"); + AddPartition(config, 1, "F", {}); + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Inactive); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + + auto r = ChoosePartition(server, config, "", 0); + + UNIT_ASSERT(r->Error); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) { + NPersQueue::TTestServer server = CreateServer(); + + CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); + CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); { auto r = ChoosePartition(server, SMDisabled, "A_Source"); diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 2ea5da2d6a67..eba55773cdcf 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -514,7 +514,7 @@ void TPartitionFixture::SendWrite(const ui64 cookie, const ui64 messageNo, const void TPartitionFixture::SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force) { - auto event = MakeHolder(cookie, owner, pipeClient, Ctx->Edge, force); + auto event = MakeHolder(cookie, owner, pipeClient, Ctx->Edge, force, true); Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release())); } diff --git a/ydb/core/persqueue/ut/partitiongraph_ut.cpp b/ydb/core/persqueue/ut/partitiongraph_ut.cpp index 0003f18cec10..9067d76fece7 100644 --- a/ydb/core/persqueue/ut/partitiongraph_ut.cpp +++ b/ydb/core/persqueue/ut/partitiongraph_ut.cpp @@ -44,45 +44,37 @@ Y_UNIT_TEST_SUITE(TPartitionGraphTest) { p5->AddParentPartitionIds(4); TPartitionGraph graph; - graph.Rebuild(config); - - const auto n0o = graph.GetPartition(0); - const auto n1o = graph.GetPartition(1); - const auto n2o = graph.GetPartition(2); - const auto n3o = graph.GetPartition(3); - const auto n4o = graph.GetPartition(4); - const auto n5o = graph.GetPartition(5); - - UNIT_ASSERT(n0o); - UNIT_ASSERT(n1o); - UNIT_ASSERT(n2o); - UNIT_ASSERT(n3o); - UNIT_ASSERT(n4o); - UNIT_ASSERT(n5o); - - auto& n0 = *n0o.value(); - auto& n1 = *n1o.value(); - auto& n2 = *n2o.value(); - auto& n3 = *n3o.value(); - auto& n4 = *n4o.value(); - auto& n5 = *n5o.value(); - - - UNIT_ASSERT_EQUAL(n0.Parents.size(), 0); - UNIT_ASSERT_EQUAL(n0.Children.size(), 0); - UNIT_ASSERT_EQUAL(n0.HierarhicalParents.size(), 0); - - UNIT_ASSERT_EQUAL(n1.Parents.size(), 0); - UNIT_ASSERT_EQUAL(n1.Children.size(), 1); - UNIT_ASSERT_EQUAL(n1.HierarhicalParents.size(), 0); - - UNIT_ASSERT_EQUAL_C(n5.Parents.size(), 2, "n5.Parents.size() == " << n5.Parents.size() << " but expected 2"); - UNIT_ASSERT_EQUAL_C(n5.Children.size(), 0, "n5.Children.size() == " << n5.Children.size() << " but expected 0"); - UNIT_ASSERT_EQUAL_C(n5.HierarhicalParents.size(), 4, "n5.HierarhicalParents.size() == " << n5.HierarhicalParents.size() << " but expected 4"); - UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n0) == n5.HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n1) != n5.HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n2) != n5.HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n3) != n5.HierarhicalParents.end()); - UNIT_ASSERT(std::find(n5.HierarhicalParents.cbegin(), n5.HierarhicalParents.cend(), &n4) != n5.HierarhicalParents.end()); + graph = std::move(MakePartitionGraph(config)); + + const auto n0 = graph.GetPartition(0); + const auto n1 = graph.GetPartition(1); + const auto n2 = graph.GetPartition(2); + const auto n3 = graph.GetPartition(3); + const auto n4 = graph.GetPartition(4); + const auto n5 = graph.GetPartition(5); + + UNIT_ASSERT(n0); + UNIT_ASSERT(n1); + UNIT_ASSERT(n2); + UNIT_ASSERT(n3); + UNIT_ASSERT(n4); + UNIT_ASSERT(n5); + + UNIT_ASSERT_VALUES_EQUAL(n0->Parents.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(n0->Children.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(n0->HierarhicalParents.size(), 0); + + UNIT_ASSERT_VALUES_EQUAL(n1->Parents.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(n1->Children.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(n1->HierarhicalParents.size(), 0); + + UNIT_ASSERT_VALUES_EQUAL(n5->Parents.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(n5->Children.size(), 0u); + UNIT_ASSERT_VALUES_EQUAL(n5->HierarhicalParents.size(), 4); + UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n0) == n5->HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n1) != n5->HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n2) != n5->HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n3) != n5->HierarhicalParents.end()); + UNIT_ASSERT(std::find(n5->HierarhicalParents.cbegin(), n5->HierarhicalParents.cend(), n4) != n5->HierarhicalParents.end()); } } diff --git a/ydb/core/persqueue/ut/pqtablet_mock.h b/ydb/core/persqueue/ut/pqtablet_mock.h index 0fd2cdc73485..99e8d12ad12f 100644 --- a/ydb/core/persqueue/ut/pqtablet_mock.h +++ b/ydb/core/persqueue/ut/pqtablet_mock.h @@ -59,6 +59,7 @@ class TPQTabletMock : public TActor, public NTabletFlatExecutor:: switch (ev->GetTypeRewrite()) { HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + // TX HFunc(TEvTxProcessing::TEvReadSet, Handle); HFunc(TEvTxProcessing::TEvReadSetAck, Handle); HFunc(TEvPQTablet::TEvSendReadSet, Handle); diff --git a/ydb/core/persqueue/utils.cpp b/ydb/core/persqueue/utils.cpp index c4a934454af2..0c133e998bd9 100644 --- a/ydb/core/persqueue/utils.cpp +++ b/ydb/core/persqueue/utils.cpp @@ -54,29 +54,69 @@ const NKikimrPQ::TPQTabletConfig::TPartition* GetPartitionConfig(const NKikimrPQ return nullptr; } -void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) { - Partitions.clear(); +TPartitionGraph::TPartitionGraph() { +} + +TPartitionGraph::TPartitionGraph(std::unordered_map&& partitions) { + Partitions = std::move(partitions); +} + +const TPartitionGraph::Node* TPartitionGraph::GetPartition(ui32 id) const { + auto it = Partitions.find(id); + if (it == Partitions.end()) { + return nullptr; + } + return &it->second; +} + +std::set TPartitionGraph::GetActiveChildren(ui32 id) const { + const auto* p = GetPartition(id); + if (!p) { + return {}; + } + + std::deque queue; + queue.push_back(p); + + std::set result; + while(!queue.empty()) { + const auto* n = queue.front(); + queue.pop_front(); - if (0 == config.AllPartitionsSize()) { - return; + if (n->Children.empty()) { + result.emplace(n->Id); + } else { + queue.insert(queue.end(), n->Children.begin(), n->Children.end()); + } + } + + return result; +} + +template +std::unordered_map BuildGraph(const ::google::protobuf::RepeatedPtrField& partitions) { + std::unordered_map result; + + if (0 == partitions.size()) { + return result; } - for (const auto& p : config.GetAllPartitions()) { - Partitions.emplace(p.GetPartitionId(), p); + for (const auto& p : partitions) { + result.emplace(p.GetPartitionId(), TPartitionGraph::Node(p.GetPartitionId(), p.GetTabletId())); } - std::deque queue; - for(const auto& p : config.GetAllPartitions()) { - auto& node = Partitions[p.GetPartitionId()]; + std::deque queue; + for(const auto& p : partitions) { + auto& node = result[p.GetPartitionId()]; node.Children.reserve(p.ChildPartitionIdsSize()); for (auto id : p.GetChildPartitionIds()) { - node.Children.push_back(&Partitions[id]); + node.Children.push_back(&result[id]); } node.Parents.reserve(p.ParentPartitionIdsSize()); for (auto id : p.GetParentPartitionIds()) { - node.Parents.push_back(&Partitions[id]); + node.Parents.push_back(&result[id]); } if (p.GetParentPartitionIds().empty()) { @@ -104,19 +144,22 @@ void TPartitionGraph::Rebuild(const NKikimrPQ::TPQTabletConfig& config) { queue.insert(queue.end(), n->Children.begin(), n->Children.end()); } } + + return result; } -std::optional TPartitionGraph::GetPartition(ui32 id) const { - auto it = Partitions.find(id); - if (it == Partitions.end()) { - return std::nullopt; - } - return std::optional(&it->second); + +TPartitionGraph::Node::Node(ui32 id, ui64 tabletId) + : Id(id) + , TabletId(tabletId) { +} + +TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config) { + return TPartitionGraph(BuildGraph(config.GetAllPartitions())); } -TPartitionGraph::Node::Node(const NKikimrPQ::TPQTabletConfig::TPartition& config) { - Id = config.GetPartitionId(); - TabletId = config.GetTabletId(); +TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config) { + return TPartitionGraph(BuildGraph(config.GetPartitions())); } } // NKikimr::NPQ diff --git a/ydb/core/persqueue/utils.h b/ydb/core/persqueue/utils.h index daa7f20d0a45..65d76ada400e 100644 --- a/ydb/core/persqueue/utils.h +++ b/ydb/core/persqueue/utils.h @@ -1,5 +1,6 @@ #pragma once +#include #include namespace NKikimr::NPQ { @@ -22,7 +23,7 @@ class TPartitionGraph { Node() = default; Node(Node&&) = default; - Node(const NKikimrPQ::TPQTabletConfig::TPartition& config); + Node(ui32 id, ui64 tabletId); ui32 Id; ui64 TabletId; @@ -35,11 +36,17 @@ class TPartitionGraph { std::set HierarhicalParents; }; - void Rebuild(const NKikimrPQ::TPQTabletConfig& config); + TPartitionGraph(); + TPartitionGraph(std::unordered_map&& partitions); + + const Node* GetPartition(ui32 id) const; + std::set GetActiveChildren(ui32 id) const; - std::optional GetPartition(ui32 id) const; private: std::unordered_map Partitions; }; +TPartitionGraph MakePartitionGraph(const NKikimrPQ::TPQTabletConfig& config); +TPartitionGraph MakePartitionGraph(const NKikimrSchemeOp::TPersQueueGroupDescription& config); + } // NKikimr::NPQ diff --git a/ydb/core/persqueue/writer/common.h b/ydb/core/persqueue/writer/common.h new file mode 100644 index 000000000000..2d5a67c71faf --- /dev/null +++ b/ydb/core/persqueue/writer/common.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include + +#include + +namespace NKikimr::NPQ { + +inline bool BasicCheck(const NKikimrClient::TResponse& response, TString& error, bool mustHaveResponse = true) { + if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { + error = TStringBuilder() << "Status is not ok" + << ": status# " << static_cast(response.GetStatus()); + return false; + } + + if (response.GetErrorCode() != NPersQueue::NErrorCode::OK) { + error = TStringBuilder() << "Error code is not ok" + << ": code# " << static_cast(response.GetErrorCode()); + return false; + } + + if (mustHaveResponse && !response.HasPartitionResponse()) { + error = "Absent partition response"; + return false; + } + + return true; +} + + +} // namespace NKikimr::NPQ + diff --git a/ydb/core/persqueue/writer/metadata_initializers.cpp b/ydb/core/persqueue/writer/metadata_initializers.cpp index 178a669d522c..dff6ade3ada7 100644 --- a/ydb/core/persqueue/writer/metadata_initializers.cpp +++ b/ydb/core/persqueue/writer/metadata_initializers.cpp @@ -60,6 +60,20 @@ void TSrcIdMetaInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr cont } result.emplace_back(new NInitializer::TGenericTableModifier(request, "create")); + + { + Ydb::Table::AlterTableRequest request; + request.set_session_id(""); + request.set_path(tablePath); + + { + auto& column = *request.add_add_columns(); + column.set_name("SeqNo"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + } + + result.emplace_back(new NInitializer::TGenericTableModifier(request, "add_column_SeqNo")); + } } result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(tablePath, "acl")); controller->OnPreparationFinished(result); diff --git a/ydb/core/persqueue/writer/partition_chooser.h b/ydb/core/persqueue/writer/partition_chooser.h index 9ab9eee1cbbf..a4d19c64addf 100644 --- a/ydb/core/persqueue/writer/partition_chooser.h +++ b/ydb/core/persqueue/writer/partition_chooser.h @@ -24,13 +24,17 @@ struct TEvPartitionChooser { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_PARTITION_CHOOSER)"); struct TEvChooseResult: public TEventLocal { - TEvChooseResult(ui32 partitionId, ui64 tabletId) + TEvChooseResult(ui32 partitionId, ui64 tabletId, const TString& ownerCookie, std::optional seqNo) : PartitionId(partitionId) - , TabletId(tabletId) { + , TabletId(tabletId) + , OwnerCookie(ownerCookie) + , SeqNo(seqNo) { } ui32 PartitionId; ui64 TabletId; + TString OwnerCookie; + std::optional SeqNo; }; struct TEvChooseError: public TEventLocal { @@ -63,6 +67,7 @@ class IPartitionChooser { virtual const TPartitionInfo* GetPartition(const TString& sourceId) const = 0; virtual const TPartitionInfo* GetPartition(ui32 partitionId) const = 0; + virtual const TPartitionInfo* GetRandomPartition() const = 0; }; std::shared_ptr CreatePartitionChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config, bool withoutHash = false); diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.cpp b/ydb/core/persqueue/writer/partition_chooser_impl.cpp index d8102d2f32de..2c7a3f64996b 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl.cpp +++ b/ydb/core/persqueue/writer/partition_chooser_impl.cpp @@ -1,5 +1,4 @@ #include "partition_chooser_impl.h" -#include "ydb/public/sdk/cpp/client/ydb_proto/accessor.h" #include #include @@ -32,424 +31,6 @@ TString TMd5Converter::operator()(const TString& sourceId) const { return AsKeyBound(Hash(sourceId)); } - -// -// TPartitionChooserActor -// - -TPartitionChooserActor::TPartitionChooserActor(TActorId parent, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - std::shared_ptr& chooser, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional preferedPartition) - : Parent(parent) - , FullConverter(fullConverter) - , SourceId(sourceId) - , PreferedPartition(preferedPartition) - , Chooser(chooser) - , SplitMergeEnabled_(SplitMergeEnabled(config.GetPQTabletConfig())) - , Partition(nullptr) - , BalancerTabletId(config.GetBalancerTabletID()) { -} - -void TPartitionChooserActor::Bootstrap(const TActorContext& ctx) { - const auto& pqConfig = AppData(ctx)->PQConfig; - - NeedUpdateTable = (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()) && !SplitMergeEnabled_ && SourceId; - - if (!SourceId) { - return ChoosePartition(ctx); - } - - TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping - : ESourceIdTableGeneration::SrcIdMeta2; - try { - EncodedSourceId = NSourceIdEncoding::EncodeSrcId( - FullConverter->GetTopicForSrcIdHash(), SourceId, TableGeneration - ); - } catch (yexception& e) { - return ReplyError(ErrorCode::BAD_REQUEST, TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), ctx); - } - - SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); - UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); - - DEBUG("SelectQuery: " << SelectQuery); - - if (pqConfig.GetTopicsAreFirstClassCitizen()) { - if (pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { - TThisActor::Become(&TThis::StateInit); - InitTable(ctx); - } else { - ChoosePartition(ctx); - } - } else { - TThisActor::Become(&TThis::StateInit); - StartKqpSession(ctx); - } -} - -void TPartitionChooserActor::Stop(const TActorContext& ctx) { - CloseKqpSession(ctx); - if (PipeToBalancer) { - NTabletPipe::CloseClient(ctx, PipeToBalancer); - } - IActor::Die(ctx); -} - -void TPartitionChooserActor::ScheduleStop() { - TThisActor::Become(&TThis::StateDestroy); -} - -TString TPartitionChooserActor::GetDatabaseName(const NActors::TActorContext& ctx) { - const auto& pqConfig = AppData(ctx)->PQConfig; - switch (TableGeneration) { - case ESourceIdTableGeneration::SrcIdMeta2: - return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig); - case ESourceIdTableGeneration::PartitionMapping: - return AppData(ctx)->TenantName; - } -} - -void TPartitionChooserActor::InitTable(const NActors::TActorContext& ctx) { - ctx.Send( - NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), - new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) - ); -} - -void TPartitionChooserActor::StartKqpSession(const NActors::TActorContext& ctx) { - auto ev = MakeCreateSessionRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); -} - -void TPartitionChooserActor::CloseKqpSession(const TActorContext& ctx) { - if (KqpSessionId) { - auto ev = MakeCloseSessionRequest(); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - - KqpSessionId = ""; - } -} - -void TPartitionChooserActor::SendUpdateRequests(const TActorContext& ctx) { - TThisActor::Become(&TThis::StateUpdate); - - auto ev = MakeUpdateQueryRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - - UpdatesInflight++; -} - -void TPartitionChooserActor::SendSelectRequest(const NActors::TActorContext& ctx) { - TThisActor::Become(&TThis::StateSelect); - - auto ev = MakeSelectQueryRequest(ctx); - ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); - - SelectInflight++; -} - -THolder TPartitionChooserActor::MakeCreateSessionRequest(const NActors::TActorContext& ctx) { - auto ev = MakeHolder(); - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - return ev; -} - -THolder TPartitionChooserActor::MakeCloseSessionRequest() { - auto ev = MakeHolder(); - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - return ev; -} - -THolder TPartitionChooserActor::MakeSelectQueryRequest(const NActors::TActorContext& ctx) { - auto ev = MakeHolder(); - - ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - ev->Record.MutableRequest()->SetQuery(SelectQuery); - - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - // fill tx settings: set commit tx flag& begin new serializable tx. - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); - ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - // keep compiled query in cache. - ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - - NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); - - SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); - - paramsBuilder - .AddParam("$Topic") - .Utf8(FullConverter->GetClientsideName()) - .Build() - .AddParam("$SourceId") - .Utf8(EncodedSourceId.EscapedSourceId) - .Build(); - - NYdb::TParams params = paramsBuilder.Build(); - - ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); - - return ev; -} - -THolder TPartitionChooserActor::MakeUpdateQueryRequest(const NActors::TActorContext& ctx) { - auto ev = MakeHolder(); - - ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); - ev->Record.MutableRequest()->SetQuery(UpdateQuery); - ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); - // fill tx settings: set commit tx flag& begin new serializable tx. - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); - if (KqpSessionId) { - ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - } - if (TxId) { - ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); - TxId = ""; - } else { - ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - } - // keep compiled query in cache. - ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); - - NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); - - SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); - - paramsBuilder - .AddParam("$Topic") - .Utf8(FullConverter->GetClientsideName()) - .Build() - .AddParam("$SourceId") - .Utf8(EncodedSourceId.EscapedSourceId) - .Build() - .AddParam("$CreateTime") - .Uint64(CreateTime) - .Build() - .AddParam("$AccessTime") - .Uint64(TInstant::Now().MilliSeconds()) - .Build() - .AddParam("$Partition") - .Uint32(Partition->PartitionId) - .Build(); - - NYdb::TParams params = paramsBuilder.Build(); - - ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); - - return ev; -} - -void TPartitionChooserActor::RequestPQRB(const NActors::TActorContext& ctx) { - Y_ABORT_UNLESS(BalancerTabletId); - - if (!PipeToBalancer) { - NTabletPipe::TClientConfig clientConfig; - clientConfig.RetryPolicy = { - .RetryLimitCount = 6, - .MinRetryTime = TDuration::MilliSeconds(10), - .MaxRetryTime = TDuration::MilliSeconds(100), - .BackoffMultiplier = 2, - .DoFirstRetryInstantly = true - }; - PipeToBalancer = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig)); - } - - TThisActor::Become(&TThis::StateSelect); - NTabletPipe::SendData(ctx, PipeToBalancer, new TEvPersQueue::TEvGetPartitionIdForWrite()); -} - -void TPartitionChooserActor::ReplyResult(const NActors::TActorContext& ctx) { - ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId)); -} - -void TPartitionChooserActor::ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) { - ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage))); - - Stop(ctx); -} - -void TPartitionChooserActor::HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) { - StartKqpSession(ctx); -} - -void TPartitionChooserActor::HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { - const auto& record = ev->Get()->Record; - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << record, ctx); - return; - } - - KqpSessionId = record.GetResponse().GetSessionId(); - Y_ABORT_UNLESS(!KqpSessionId.empty()); - - SendSelectRequest(ctx); -} - -void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { - const auto& record = ev->Get()->Record; - KqpSessionId = record.GetResponse().GetSessionId(); - - Stop(ctx); -} - -void TPartitionChooserActor::HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record.GetRef(); - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << record, ctx); - } - - auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); - - TxId = record.GetResponse().GetTxMeta().id(); - Y_ABORT_UNLESS(!TxId.empty()); - - if (t.ListSize() != 0) { - auto& tt = t.GetList(0).GetStruct(0); - if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition - auto accessTime = t.GetList(0).GetStruct(2).GetOptional().GetUint64(); - if (accessTime > AccessTime) { // AccessTime - PartitionId = tt.GetOptional().GetUint32(); - DEBUG("Received partition " << PartitionId << " from table for SourceId=" << SourceId); - Partition = Chooser->GetPartition(PartitionId.value()); - CreateTime = t.GetList(0).GetStruct(1).GetOptional().GetUint64(); - AccessTime = accessTime; - } - } - } - - if (CreateTime == 0) { - CreateTime = TInstant::Now().MilliSeconds(); - } - - if (!Partition) { - ChoosePartition(ctx); - } else { - OnPartitionChosen(ctx); - } -} - -void TPartitionChooserActor::HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { - PartitionId = ev->Get()->Record.GetPartitionId(); - DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << SourceId); - Partition = Chooser->GetPartition(PartitionId.value()); - - OnPartitionChosen(ctx); -} - -void TPartitionChooserActor::HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - Y_UNUSED(ev); - - ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx); -} - -void TPartitionChooserActor::HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record.GetRef(); - - if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { - if (!PartitionPersisted) { - CloseKqpSession(ctx); - StartKqpSession(ctx); - } - return; - } - - if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { - if (!PartitionPersisted) { - ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx); - } - return; - } - - if (!PartitionPersisted) { - ReplyResult(ctx); - PartitionPersisted = true; - // Use tx only for query after select. Updating AccessTime without transaction. - CloseKqpSession(ctx); - } - - TThisActor::Become(&TThis::StateIdle); -} - -void TPartitionChooserActor::HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr&, const TActorContext& ctx) { - Stop(ctx); -} - -void TPartitionChooserActor::HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) { - if (PartitionPersisted) { - // we do not update AccessTime for Split/Merge partitions because don't use table. - SendUpdateRequests(ctx); - } -} - -void TPartitionChooserActor::ChoosePartition(const TActorContext& ctx) { - auto [roundRobin, p] = ChoosePartitionSync(ctx); - if (roundRobin) { - RequestPQRB(ctx); - } else { - Partition = p; - OnPartitionChosen(ctx); - } -} - -void TPartitionChooserActor::OnPartitionChosen(const TActorContext& ctx) { - if (!Partition && PreferedPartition) { - return ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "Prefered partition " << (PreferedPartition.value() + 1) << " is not exists or inactive.", - ctx); - } - - if (!Partition) { - return ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); - } - - if (PreferedPartition && Partition->PartitionId != PreferedPartition.value()) { - return ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "MessageGroupId " << SourceId << " is already bound to PartitionGroupId " - << (Partition->PartitionId + 1) << ", but client provided " << (PreferedPartition.value() + 1) - << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " - "another MessageGroupId, specify PartitionGroupId " << (Partition->PartitionId + 1) - << ", or do not specify PartitionGroupId at all.", - ctx); - } - - if (SplitMergeEnabled_ && SourceId && PartitionId) { - if (Partition != Chooser->GetPartition(SourceId)) { - return ReplyError(ErrorCode::BAD_REQUEST, - TStringBuilder() << "Message group " << SourceId << " not in a partition boundary", ctx); - } - } - - if (NeedUpdateTable) { - SendUpdateRequests(ctx); - } else { - TThisActor::Become(&TThis::StateIdle); - - ReplyResult(ctx); - } -} - -std::pair TPartitionChooserActor::ChoosePartitionSync(const TActorContext& ctx) const { - const auto& pqConfig = AppData(ctx)->PQConfig; - if (SourceId && SplitMergeEnabled_) { - return {false, Chooser->GetPartition(SourceId)}; - } else if (PreferedPartition) { - return {false, Chooser->GetPartition(PreferedPartition.value())}; - } else if (pqConfig.GetTopicsAreFirstClassCitizen() && SourceId) { - return {false, Chooser->GetPartition(SourceId)}; - } else { - return {true, nullptr}; - } -} - } // namespace NPartitionChooser @@ -469,7 +50,6 @@ std::shared_ptr CreatePartitionChooser(const NKikimrSchemeOp: } } - IActor* CreatePartitionChooserActor(TActorId parentId, const NKikimrSchemeOp::TPersQueueGroupDescription& config, NPersQueue::TTopicConverterPtr& fullConverter, @@ -477,8 +57,13 @@ IActor* CreatePartitionChooserActor(TActorId parentId, std::optional preferedPartition, bool withoutHash) { auto chooser = CreatePartitionChooser(config, withoutHash); - return new NPartitionChooser::TPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition); + if (SplitMergeEnabled(config.GetPQTabletConfig())) { + return new NPartitionChooser::TSMPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition); + } else { + return new NPartitionChooser::TPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition); + } } - } // namespace NKikimr::NPQ + +std::unordered_map NKikimr::NTabletPipe::NTest::TPipeMock::Tablets; diff --git a/ydb/core/persqueue/writer/partition_chooser_impl.h b/ydb/core/persqueue/writer/partition_chooser_impl.h index 2fede5ae1226..9fdacf20dfac 100644 --- a/ydb/core/persqueue/writer/partition_chooser_impl.h +++ b/ydb/core/persqueue/writer/partition_chooser_impl.h @@ -1,29 +1,15 @@ #pragma once -#include #include -#include -#include -#include -#include -#include -#include #include -#include -#include -#include -#include "partition_chooser.h" +#include "partition_chooser_impl__old_chooser_actor.h" +#include "partition_chooser_impl__sm_chooser_actor.h" + namespace NKikimr::NPQ { namespace NPartitionChooser { -#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, message); - -using namespace NActors; -using namespace NSourceIdEncoding; -using namespace Ydb::PersQueue::ErrorCode; - // For testing purposes struct TAsIsSharder { ui32 operator()(const TString& sourceId, ui32 totalShards) const; @@ -60,6 +46,7 @@ class TBoundaryChooser: public IPartitionChooser { const TPartitionInfo* GetPartition(const TString& sourceId) const override; const TPartitionInfo* GetPartition(ui32 partitionId) const override; + const TPartitionInfo* GetRandomPartition() const override; private: const TString TopicName; @@ -68,13 +55,14 @@ class TBoundaryChooser: public IPartitionChooser { }; // It is old alghoritm of choosing partition by SourceId -template +template class THashChooser: public IPartitionChooser { public: THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescription& config); const TPartitionInfo* GetPartition(const TString& sourceId) const override; const TPartitionInfo* GetPartition(ui32 partitionId) const override; + const TPartitionInfo* GetRandomPartition() const override; private: std::vector Partitions; @@ -82,139 +70,6 @@ class THashChooser: public IPartitionChooser { }; -class TPartitionChooserActor: public TActorBootstrapped { - using TThis = TPartitionChooserActor; - using TThisActor = TActor; - - friend class TActorBootstrapped; -public: - using TPartitionInfo = typename IPartitionChooser::TPartitionInfo; - - TPartitionChooserActor(TActorId parentId, - const NKikimrSchemeOp::TPersQueueGroupDescription& config, - std::shared_ptr& chooser, - NPersQueue::TTopicConverterPtr& fullConverter, - const TString& sourceId, - std::optional preferedPartition); - - void Bootstrap(const TActorContext& ctx); - -private: - void HandleInit(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx); - void HandleInit(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx); - - STATEFN(StateInit) { - switch (ev->GetTypeRewrite()) { - HFunc(NMetadata::NProvider::TEvManagerPrepared, HandleInit); - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleInit); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - -private: - void HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); - void HandleSelect(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx); - void HandleSelect(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx); - - STATEFN(StateSelect) { - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleSelect); - HFunc(TEvPersQueue::TEvGetPartitionIdForWriteResponse, HandleSelect); - HFunc(TEvTabletPipe::TEvClientDestroyed, HandleSelect); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - -private: - void HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr& ev, const TActorContext& ctx); - - STATEFN(StateIdle) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvPartitionChooser::TEvRefreshRequest, HandleIdle); - SFunc(TEvents::TEvPoison, Stop); - } - } - -private: - void HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); - - STATEFN(StateUpdate) { - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleUpdate); - sFunc(TEvents::TEvPoison, ScheduleStop); - } - } - -private: - void HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx); - void HandleDestroy(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); - - STATEFN(StateDestroy) { - switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleDestroy); - HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleDestroy); - } - } - -private: - void ScheduleStop(); - void Stop(const TActorContext& ctx); - - void ChoosePartition(const TActorContext& ctx); - void OnPartitionChosen(const TActorContext& ctx); - std::pair ChoosePartitionSync(const TActorContext& ctx) const; - - TString GetDatabaseName(const NActors::TActorContext& ctx); - - void InitTable(const NActors::TActorContext& ctx); - - void StartKqpSession(const NActors::TActorContext& ctx); - void CloseKqpSession(const TActorContext& ctx); - void SendUpdateRequests(const TActorContext& ctx); - void SendSelectRequest(const NActors::TActorContext& ctx); - - void RequestPQRB(const NActors::TActorContext& ctx); - - THolder MakeCreateSessionRequest(const NActors::TActorContext& ctx); - THolder MakeCloseSessionRequest(); - THolder MakeSelectQueryRequest(const NActors::TActorContext& ctx); - THolder MakeUpdateQueryRequest(const NActors::TActorContext& ctx); - - void ReplyResult(const NActors::TActorContext& ctx); - void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx); - -private: - const TActorId Parent; - const NPersQueue::TTopicConverterPtr FullConverter; - const TString SourceId; - const std::optional PreferedPartition; - const std::shared_ptr Chooser; - const bool SplitMergeEnabled_; - - std::optional PartitionId; - const TPartitionInfo* Partition; - bool PartitionPersisted = false; - ui64 CreateTime = 0; - ui64 AccessTime = 0; - - bool NeedUpdateTable = false; - - NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId; - TString KqpSessionId; - TString TxId; - - NPQ::ESourceIdTableGeneration TableGeneration; - TString SelectQuery; - TString UpdateQuery; - - size_t UpdatesInflight = 0; - size_t SelectInflight = 0; - - ui64 BalancerTabletId; - TActorId PipeToBalancer; -}; - - // // TBoundaryChooser // @@ -233,7 +88,7 @@ TBoundaryChooser::TBoundaryChooser(const NKikimrSchemeOp::TPersQueueGro } std::sort(Partitions.begin(), Partitions.end(), - [](const TPartitionInfo& a, const TPartitionInfo& b) { return a.ToBound && a.ToBound < b.ToBound; }); + [](const TPartitionInfo& a, const TPartitionInfo& b) { return !b.ToBound || (a.ToBound && a.ToBound < b.ToBound); }); } template @@ -252,6 +107,15 @@ const typename TBoundaryChooser::TPartitionInfo* TBoundaryChooser +const typename TBoundaryChooser::TPartitionInfo* TBoundaryChooser::GetRandomPartition() const { + if (Partitions.empty()) { + return nullptr; + } + size_t p = RandomNumber(Partitions.size()); + return &Partitions[p]; +} + // @@ -272,6 +136,9 @@ THashChooser::THashChooser(const NKikimrSchemeOp::TPersQueueGroupDescri template const typename THashChooser::TPartitionInfo* THashChooser::GetPartition(const TString& sourceId) const { + if (Partitions.empty()) { + return nullptr; + } return &Partitions[Hasher(sourceId, Partitions.size())]; } @@ -285,5 +152,32 @@ const typename THashChooser::TPartitionInfo* THashChooser::Get return it->PartitionId == partitionId ? it : nullptr; } +template +const typename THashChooser::TPartitionInfo* THashChooser::GetRandomPartition() const { + if (Partitions.empty()) { + return nullptr; + } + size_t p = RandomNumber(Partitions.size()); + return &Partitions[p]; +} + + } // namespace NPartitionChooser + + +inline IActor* CreatePartitionChooserActorM(TActorId parentId, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional preferedPartition, + bool withoutHash) { + auto chooser = CreatePartitionChooser(config, withoutHash); + if (SplitMergeEnabled(config.GetPQTabletConfig())) { + return new NPartitionChooser::TSMPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition); + } else { + return new NPartitionChooser::TPartitionChooserActor(parentId, config, chooser, fullConverter, sourceId, preferedPartition); + } +} + + } // namespace NKikimr::NPQ diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h new file mode 100644 index 000000000000..8d718d8d454f --- /dev/null +++ b/ydb/core/persqueue/writer/partition_chooser_impl__abstract_chooser_actor.h @@ -0,0 +1,368 @@ +#pragma once + +#include "partition_chooser.h" +#include "partition_chooser_impl__partition_helper.h" +#include "partition_chooser_impl__table_helper.h" + +#include +#include +#include + +namespace NKikimr::NPQ::NPartitionChooser { + +#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) +#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" +#endif + + +#define LOG_PREFIX "TPartitionChooser " << SelfId() \ + << " (SourceId=" << SourceId \ + << ", PreferedPartition=" << PreferedPartition \ + << ") " +#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); + +using TPartitionInfo = typename IPartitionChooser::TPartitionInfo; + +using namespace NActors; +using namespace NSourceIdEncoding; +using namespace Ydb::PersQueue::ErrorCode; + +template +class TAbstractPartitionChooserActor: public TActorBootstrapped { +public: + using TThis = TAbstractPartitionChooserActor; + using TThisActor = TActor; + + + TAbstractPartitionChooserActor(TActorId parentId, + std::shared_ptr& chooser, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional preferedPartition) + : Parent(parentId) + , SourceId(sourceId) + , PreferedPartition(preferedPartition) + , Chooser(chooser) + , TableHelper(fullConverter->GetClientsideName(), fullConverter->GetTopicForSrcIdHash()) + , PartitionHelper() { + } + + TActorIdentity SelfId() const { + return TActor::SelfId(); + } + + void Initialize(const NActors::TActorContext& ctx) { + TableHelper.Initialize(ctx, SourceId); + } + + void PassAway() { + auto ctx = TActivationContext::ActorContextFor(SelfId()); + TableHelper.CloseKqpSession(ctx); + PartitionHelper.Close(ctx); + } + + bool NeedTable(const NActors::TActorContext& ctx) { + const auto& pqConfig = AppData(ctx)->PQConfig; + return SourceId && (!pqConfig.GetTopicsAreFirstClassCitizen() || pqConfig.GetUseSrcIdMetaMappingInFirstClass()); + } + +protected: + void InitTable(const NActors::TActorContext& ctx) { + TThis::Become(&TThis::StateInitTable); + const auto& pqConfig = AppData(ctx)->PQConfig; + if (SourceId && pqConfig.GetTopicsAreFirstClassCitizen() && pqConfig.GetUseSrcIdMetaMappingInFirstClass()) { + DEBUG("InitTable"); + TableHelper.SendInitTableRequest(ctx); + } else { + StartKqpSession(ctx); + } + } + + void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) { + StartKqpSession(ctx); + } + + STATEFN(StateInitTable) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); + SFunc(TEvents::TEvPoison, TThis::Die); + } + } + +protected: + void StartKqpSession(const NActors::TActorContext& ctx) { + if (NeedTable(ctx)) { + DEBUG("StartKqpSession") + TThis::Become(&TThis::StateCreateKqpSession); + TableHelper.SendCreateSessionRequest(ctx); + } else { + OnSelected(ctx); + } + } + + void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { + if (!TableHelper.Handle(ev, ctx)) { + return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ53 : " << ev->Get()->Record.DebugString(), ctx); + } + + SendSelectRequest(ctx); + } + + void ScheduleStop() { + TThis::Become(&TThis::StateDestroying); + } + + STATEFN(StateCreateKqpSession) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); + sFunc(TEvents::TEvPoison, ScheduleStop); + } + } + +protected: + void SendSelectRequest(const NActors::TActorContext& ctx) { + TThis::Become(&TThis::StateSelect); + DEBUG("Select from the table"); + TableHelper.SendSelectRequest(ctx); + } + + void HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + if (!TableHelper.HandleSelect(ev, ctx)) { + return ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ50 : " << ev->Get()->Record.DebugString(), ctx); + } + + TRACE("Selected from table PartitionId=" << TableHelper.PartitionId() << " SeqNo=" << TableHelper.SeqNo()); + if (TableHelper.PartitionId()) { + Partition = Chooser->GetPartition(TableHelper.PartitionId().value()); + } + SeqNo = TableHelper.SeqNo(); + + OnSelected(ctx); + } + + virtual void OnSelected(const NActors::TActorContext& ctx) = 0; + + STATEFN(StateSelect) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleSelect); + SFunc(TEvents::TEvPoison, TThis::Die); + } + } + +protected: + void SendUpdateRequests(const TActorContext& ctx) { + if (NeedTable(ctx)) { + TThis::Become(&TThis::StateUpdate); + DEBUG("Update the table"); + TableHelper.SendUpdateRequest(Partition->PartitionId, SeqNo, ctx); + } else { + StartGetOwnership(ctx); + } + } + + void HandleUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record.GetRef(); + DEBUG("HandleUpdate PartitionPersisted=" << PartitionPersisted << " Status=" << record.GetYdbStatus()); + + if (record.GetYdbStatus() == Ydb::StatusIds::ABORTED) { + if (!PartitionPersisted) { + TableHelper.CloseKqpSession(ctx); + StartKqpSession(ctx); + } + return; + } + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + if (!PartitionPersisted) { + ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "kqp error Marker# PQ51 : " << record, ctx); + } + return; + } + + if (!PartitionPersisted) { + PartitionPersisted = true; + // Use tx only for query after select. Updating AccessTime without transaction. + TableHelper.CloseKqpSession(ctx); + + return StartGetOwnership(ctx); + } + + StartIdle(); + } + + STATEFN(StateUpdate) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleUpdate); + sFunc(TEvents::TEvPoison, ScheduleStop); + } + } + +protected: + void StartCheckPartitionRequest(const TActorContext &ctx) { + TThis::Become(&TThis::StateCheckPartition); + PartitionHelper.Open(Partition->TabletId, ctx); + PartitionHelper.SendCheckPartitionStatusRequest(Partition->PartitionId, ctx); + } + + void Handle(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse::TPtr& ev, const NActors::TActorContext& ctx) { + if (NKikimrPQ::ETopicPartitionStatus::Active == ev->Get()->Record.GetStatus()) { + PartitionHelper.Close(ctx); + return SendUpdateRequests(ctx); + } + ReplyError(ErrorCode::INITIALIZING, TStringBuilder() << "Partition isn`t active", ctx); + } + + STATEFN(StateCheckPartition) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(NKikimr::TEvPQ::TEvCheckPartitionStatusResponse, Handle); + HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership); + HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership); + sFunc(TEvents::TEvPoison, ScheduleStop); + } + } + +protected: + void StartGetOwnership(const TActorContext &ctx) { + TThis::Become(&TThis::StateOwnership); + if (!Partition) { + return ReplyError(ErrorCode::INITIALIZING, "Partition not choosed", ctx); + } + + DEBUG("GetOwnership Partition TabletId=" << Partition->TabletId); + + PartitionHelper.Open(Partition->TabletId, ctx); + PartitionHelper.SendGetOwnershipRequest(Partition->PartitionId, SourceId, true, ctx); + } + + void HandleOwnership(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) { + DEBUG("HandleOwnership"); + auto& record = ev->Get()->Record; + + TString error; + if (!BasicCheck(record, error)) { + return ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx); + } + + const auto& response = record.GetPartitionResponse(); + if (!response.HasCmdGetOwnershipResult()) { + return ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx); + } + + if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { + return ReplyError(ErrorCode::INITIALIZING, "Partition is not active", ctx); + } + + OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); + + PartitionHelper.Close(ctx); + + OnOwnership(ctx); + } + + void HandleOwnership(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) { + auto msg = ev->Get(); + if (PartitionHelper.IsPipe(ev->Sender) && msg->Status != NKikimrProto::OK) { + TableHelper.CloseKqpSession(ctx); + ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx); + } + } + + void HandleOwnership(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) { + if (PartitionHelper.IsPipe(ev->Sender)) { + TableHelper.CloseKqpSession(ctx); + ReplyError(ErrorCode::INITIALIZING, "Pipe closed", ctx); + } + } + + virtual void OnOwnership(const TActorContext &ctx) = 0; + + STATEFN(StateOwnership) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(TEvPersQueue::TEvResponse, HandleOwnership); + HFunc(TEvTabletPipe::TEvClientConnected, HandleOwnership); + HFunc(TEvTabletPipe::TEvClientDestroyed, HandleOwnership); + sFunc(TEvents::TEvPoison, ScheduleStop); + } + } + + +protected: + void StartIdle() { + TThis::Become(&TThis::StateIdle); + DEBUG("Start idle"); + } + + void HandleIdle(TEvPartitionChooser::TEvRefreshRequest::TPtr&, const TActorContext& ctx) { + if (PartitionPersisted) { + SendUpdateRequests(ctx); + } + } + + STATEFN(StateIdle) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(TEvPartitionChooser::TEvRefreshRequest, HandleIdle); + SFunc(TEvents::TEvPoison, TThis::Die); + } + } + + +protected: + void HandleDestroy(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const NActors::TActorContext& ctx) { + TableHelper.Handle(ev, ctx); + TThis::Die(ctx); + } + + STATEFN(StateDestroying) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, HandleDestroy); + } + } + +protected: + void ReplyResult(const NActors::TActorContext& ctx) { + ctx.Send(Parent, new TEvPartitionChooser::TEvChooseResult(Partition->PartitionId, Partition->TabletId, TThis::OwnerCookie, SeqNo)); + } + + void ReplyError(ErrorCode code, TString&& errorMessage, const NActors::TActorContext& ctx) { + INFO("ReplyError: " << errorMessage); + ctx.Send(Parent, new TEvPartitionChooser::TEvChooseError(code, std::move(errorMessage))); + + TThis::Die(ctx); + } + + +protected: + const TActorId Parent; + const TString SourceId; + const std::optional PreferedPartition; + const std::shared_ptr Chooser; + + const TPartitionInfo* Partition = nullptr; + + TTableHelper TableHelper; + TPartitionHelper PartitionHelper; + + bool PartitionPersisted = false; + + TString OwnerCookie; + std::optional SeqNo = 0; +}; + +#undef LOG_PREFIX +#undef TRACE +#undef DEBUG +#undef INFO +#undef ERROR + +} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h new file mode 100644 index 000000000000..97a975095228 --- /dev/null +++ b/ydb/core/persqueue/writer/partition_chooser_impl__old_chooser_actor.h @@ -0,0 +1,163 @@ +#pragma once + +#include "partition_chooser_impl__abstract_chooser_actor.h" +#include "partition_chooser_impl__pqrb_helper.h" + +namespace NKikimr::NPQ::NPartitionChooser { + +#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) +#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" +#endif + + +#define LOG_PREFIX "TPartitionChooser " << SelfId() \ + << " (SourceId=" << TThis::SourceId \ + << ", PreferedPartition=" << TThis::PreferedPartition \ + << ") " +#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); + +template +class TPartitionChooserActor: public TAbstractPartitionChooserActor, TPipeCreator> { +public: + using TThis = TPartitionChooserActor; + using TThisActor = TActor; + using TParentActor = TAbstractPartitionChooserActor, TPipeCreator>; + + TPartitionChooserActor(TActorId parentId, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + std::shared_ptr& chooser, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional preferedPartition) + : TAbstractPartitionChooserActor, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition) + , PQRBHelper(config.GetBalancerTabletID()) { + } + + void Bootstrap(const TActorContext& ctx) { + TThis::Initialize(ctx); + TThis::InitTable(ctx); + } + + TActorIdentity SelfId() const { + return TActor>::SelfId(); + } + + void OnSelected(const TActorContext &ctx) override { + if (TThis::Partition) { + return OnPartitionChosen(ctx); + } + + auto [roundRobin, p] = ChoosePartitionSync(ctx); + if (roundRobin) { + RequestPQRB(ctx); + } else { + TThis::Partition = p; + OnPartitionChosen(ctx); + } + } + + void OnOwnership(const TActorContext &ctx) override { + DEBUG("OnOwnership"); + TThis::ReplyResult(ctx); + } + +private: + void RequestPQRB(const NActors::TActorContext& ctx) { + DEBUG("RequestPQRB") + TThis::Become(&TThis::StatePQRB); + + if (PQRBHelper.PartitionId()) { + PartitionId = PQRBHelper.PartitionId(); + OnPartitionChosen(ctx); + } else { + PQRBHelper.SendRequest(ctx); + } + } + + void Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { + PartitionId = PQRBHelper.Handle(ev, ctx); + DEBUG("Received partition " << PartitionId << " from PQRB for SourceId=" << TThis::SourceId); + TThis::Partition = TThis::Chooser->GetPartition(PQRBHelper.PartitionId().value()); + + PQRBHelper.Close(ctx); + + OnPartitionChosen(ctx); + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx) { + if (PQRBHelper.IsPipe(ev->Sender) && ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { + TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe connection fail", ctx); + } + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx) { + if(PQRBHelper.IsPipe(ev->Sender)) { + TThis::ReplyError(ErrorCode::INITIALIZING, "Pipe destroyed", ctx); + } + } + + STATEFN(StatePQRB) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(TEvPersQueue::TEvGetPartitionIdForWriteResponse, Handle); + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + SFunc(TEvents::TEvPoison, TThis::Die); + } + } + + +private: + void OnPartitionChosen(const TActorContext& ctx) { + TRACE("OnPartitionChosen"); + + if (!TThis::Partition && TThis::PreferedPartition) { + return TThis::ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "Prefered partition " << (TThis::PreferedPartition.value() + 1) << " is not exists or inactive.", + ctx); + } + + if (!TThis::Partition) { + return TThis::ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); + } + + if (TThis::PreferedPartition && TThis::Partition->PartitionId != TThis::PreferedPartition.value()) { + return TThis::ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "MessageGroupId " << TThis::SourceId << " is already bound to PartitionGroupId " + << (TThis::Partition->PartitionId + 1) << ", but client provided " << (TThis::PreferedPartition.value() + 1) + << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " + "another MessageGroupId, specify PartitionGroupId " << (TThis::Partition->PartitionId + 1) + << ", or do not specify PartitionGroupId at all.", + ctx); + } + + TThis::SendUpdateRequests(ctx); + } + + std::pair ChoosePartitionSync(const TActorContext& ctx) const { + const auto& pqConfig = AppData(ctx)->PQConfig; + if (TThis::PreferedPartition) { + return {false, TThis::Chooser->GetPartition(TThis::PreferedPartition.value())}; + } else if (pqConfig.GetTopicsAreFirstClassCitizen() && TThis::SourceId) { + return {false, TThis::Chooser->GetPartition(TThis::SourceId)}; + } else { + return {true, nullptr}; + } + }; + + +private: + std::optional PartitionId; + + TPQRBHelper PQRBHelper; +}; + +#undef LOG_PREFIX +#undef TRACE +#undef DEBUG +#undef INFO +#undef ERROR + +} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h new file mode 100644 index 000000000000..463217557c6b --- /dev/null +++ b/ydb/core/persqueue/writer/partition_chooser_impl__partition_helper.h @@ -0,0 +1,86 @@ +#pragma once + +#include "common.h" +#include "pipe_utils.h" +#include "source_id_encoding.h" + +#include +#include +#include + +namespace NKikimr::NPQ::NPartitionChooser { + +template +class TPartitionHelper { +public: + void Open(ui64 tabletId, const TActorContext& ctx) { + Close(ctx); + + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = { + .RetryLimitCount = 6, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(100), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; + Pipe = ctx.RegisterWithSameMailbox(TPipeCreator::CreateClient(ctx.SelfID, tabletId, clientConfig)); + } + + void SendGetOwnershipRequest(ui32 partitionId, const TString& sourceId, bool registerIfNotExists, const TActorContext& ctx) { + auto ev = MakeRequest(partitionId, Pipe); + + auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership(); + cmd.SetOwner(sourceId ? sourceId : CreateGuidAsString()); + cmd.SetForce(true); + cmd.SetRegisterIfNotExists(registerIfNotExists); + + NTabletPipe::SendData(ctx, Pipe, ev.Release()); + } + + void SendMaxSeqNoRequest(ui32 partitionId, const TString& sourceId, const TActorContext& ctx) { + auto ev = MakeRequest(partitionId, Pipe); + + auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetMaxSeqNo(); + cmd.AddSourceId(NSourceIdEncoding::EncodeSimple(sourceId)); + + NTabletPipe::SendData(ctx, Pipe, ev.Release()); + } + + void SendCheckPartitionStatusRequest(ui32 partitionId, const TActorContext& ctx) { + auto ev = MakeHolder(partitionId); + + NTabletPipe::SendData(ctx, Pipe, ev.Release()); + } + + void Close(const TActorContext& ctx) { + if (Pipe) { + NTabletPipe::CloseClient(ctx, Pipe); + Pipe = TActorId(); + } + } + + const TString& OwnerCookie() const { + return OwnerCookie_; + } + + bool IsPipe(const TActorId& actorId) const { + return actorId == Pipe; + } + +private: + THolder MakeRequest(ui32 partitionId, TActorId pipe) { + auto ev = MakeHolder(); + + ev->Record.MutablePartitionRequest()->SetPartition(partitionId); + ActorIdToProto(pipe, ev->Record.MutablePartitionRequest()->MutablePipeClient()); + + return ev; + } + +private: + TActorId Pipe; + TString OwnerCookie_; +}; + +} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h new file mode 100644 index 000000000000..c1a235901b4a --- /dev/null +++ b/ydb/core/persqueue/writer/partition_chooser_impl__pqrb_helper.h @@ -0,0 +1,65 @@ +#pragma once + +#include "pipe_utils.h" + +#include + + +namespace NKikimr::NPQ::NPartitionChooser { + +template +class TPQRBHelper { +public: + TPQRBHelper(ui64 balancerTabletId) + : BalancerTabletId(balancerTabletId) { + } + + std::optional PartitionId() const { + return PartitionId_; + } + + void SendRequest(const NActors::TActorContext& ctx) { + Y_ABORT_UNLESS(BalancerTabletId); + + if (!Pipe) { + NTabletPipe::TClientConfig clientConfig; + clientConfig.RetryPolicy = { + .RetryLimitCount = 6, + .MinRetryTime = TDuration::MilliSeconds(10), + .MaxRetryTime = TDuration::MilliSeconds(100), + .BackoffMultiplier = 2, + .DoFirstRetryInstantly = true + }; + Pipe = ctx.RegisterWithSameMailbox(TPipeCreator::CreateClient(ctx.SelfID, BalancerTabletId, clientConfig)); + } + + NTabletPipe::SendData(ctx, Pipe, new TEvPersQueue::TEvGetPartitionIdForWrite()); + } + + ui32 Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const TActorContext& ctx) { + Close(ctx); + + PartitionId_ = ev->Get()->Record.GetPartitionId(); + return PartitionId_.value(); + } + + void Close(const TActorContext& ctx) { + if (Pipe) { + NTabletPipe::CloseClient(ctx, Pipe); + Pipe = TActorId(); + } + } + + bool IsPipe(const TActorId& actorId) const { + return actorId == Pipe; + } + +private: + const ui64 BalancerTabletId; + + TActorId Pipe; + std::optional PartitionId_; +}; + + +} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h new file mode 100644 index 000000000000..695af84d5976 --- /dev/null +++ b/ydb/core/persqueue/writer/partition_chooser_impl__sm_chooser_actor.h @@ -0,0 +1,272 @@ +#pragma once + +#include "partition_chooser_impl__abstract_chooser_actor.h" + +#include + +namespace NKikimr::NPQ::NPartitionChooser { + +#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) +#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" +#endif + + +#define LOG_PREFIX "TPartitionChooser " << SelfId() \ + << " (SourceId=" << TThis::SourceId \ + << ", PreferedPartition=" << TThis::PreferedPartition \ + << ") " +#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define INFO(message) LOG_INFO_S (*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); + +template +class TSMPartitionChooserActor: public TAbstractPartitionChooserActor, TPipeCreator> { +public: + using TThis = TSMPartitionChooserActor; + using TThisActor = TActor; + using TParentActor = TAbstractPartitionChooserActor, TPipeCreator>; + + TSMPartitionChooserActor(TActorId parentId, + const NKikimrSchemeOp::TPersQueueGroupDescription& config, + std::shared_ptr& chooser, + NPersQueue::TTopicConverterPtr& fullConverter, + const TString& sourceId, + std::optional preferedPartition) + : TAbstractPartitionChooserActor, TPipeCreator>(parentId, chooser, fullConverter, sourceId, preferedPartition) + , Graph(MakePartitionGraph(config)) { + + } + + void Bootstrap(const TActorContext& ctx) { + BoundaryPartition = ChoosePartitionSync(); + + if (TThis::SourceId) { + TThis::Initialize(ctx); + GetOwnershipFast(ctx); + } else { + TThis::Partition = BoundaryPartition; + TThis::StartGetOwnership(ctx); + } + } + + TActorIdentity SelfId() const { + return TActor>::SelfId(); + } + + void OnSelected(const TActorContext &ctx) override { + if (TThis::Partition) { + // If we have found a partition, it means that the partition is active, which means + // that we need to continue writing to it, and it does not matter whether it falls + // within the boundaries of the distribution. + return OnPartitionChosen(ctx); + } + + if (!TThis::TableHelper.PartitionId()) { + // They didn't write with this SourceId earlier, or the SourceID has already been deleted. + TThis::Partition = BoundaryPartition; + return OnPartitionChosen(ctx); + } + + const auto* node = Graph.GetPartition(TThis::TableHelper.PartitionId().value()); + if (!node) { + // The partition where the writting was performed earlier has already been deleted. + // We can write without taking into account the hierarchy of the partition. + TThis::Partition = BoundaryPartition; + return OnPartitionChosen(ctx); + } + + // Choosing a partition based on the split and merge hierarchy. + auto activeChildren = Graph.GetActiveChildren(TThis::TableHelper.PartitionId().value()); + if (activeChildren.empty()) { + return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "has't active partition Marker# PC01", ctx); + } + + if (activeChildren.contains(BoundaryPartition->PartitionId)) { + // First of all, we are trying to write to the partition taking into account the + // distribution boundaries. + TThis::Partition = BoundaryPartition; + } else { + // It is important to save the write into account the hierarchy of partitions, because + // this will preserve the guarantees of the order of reading. + auto n = RandomNumber(activeChildren.size()); + std::vector ac; + ac.reserve(activeChildren.size()); + ac.insert(ac.end(), activeChildren.begin(), activeChildren.end()); + auto id = ac[n]; + TThis::Partition = TThis::Chooser->GetPartition(id); + } + + if (!TThis::Partition) { + return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "can't choose partition Marker# PC02", ctx); + } + + GetOldSeqNo(ctx); + } + + void OnOwnership(const TActorContext &ctx) override { + DEBUG("OnOwnership"); + TThis::ReplyResult(ctx); + } + +private: + void GetOwnershipFast(const TActorContext &ctx) { + TThis::Become(&TThis::StateOwnershipFast); + if (!BoundaryPartition) { + return TThis::ReplyError(ErrorCode::INITIALIZING, "A partition not choosed", ctx); + } + + DEBUG("GetOwnershipFast Partition=" << BoundaryPartition->PartitionId << " TabletId=" << BoundaryPartition->TabletId); + + TThis::PartitionHelper.Open(BoundaryPartition->TabletId, ctx); + TThis::PartitionHelper.SendGetOwnershipRequest(BoundaryPartition->PartitionId, TThis::SourceId, false, ctx); + } + + void HandleOwnershipFast(TEvPersQueue::TEvResponse::TPtr& ev, const NActors::TActorContext& ctx) { + DEBUG("HandleOwnershipFast"); + auto& record = ev->Get()->Record; + + TString error; + if (!BasicCheck(record, error)) { + return TThis::InitTable(ctx); + } + + const auto& response = record.GetPartitionResponse(); + if (!response.HasCmdGetOwnershipResult()) { + return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent Ownership result", ctx); + } + + if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { + return TThis::ReplyError(ErrorCode::INITIALIZING, "Configuration changed", ctx); + } + + TThis::OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); + + if (response.GetCmdGetOwnershipResult().GetSeqNo() > 0) { + // Fast path: the partition ative and already written + TThis::SendUpdateRequests(ctx); + return TThis::ReplyResult(ctx); + } + + TThis::InitTable(ctx); + } + + STATEFN(StateOwnershipFast) { + TRACE_EVENT(NKikimrServices::PQ_PARTITION_CHOOSER); + switch (ev->GetTypeRewrite()) { + HFunc(TEvPersQueue::TEvResponse, HandleOwnershipFast); + HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership); + HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership); + SFunc(TEvents::TEvPoison, TThis::Die); + } + } + + +private: + void GetOldSeqNo(const TActorContext &ctx) { + DEBUG("GetOldSeqNo"); + TThis::Become(&TThis::StateGetMaxSeqNo); + + const auto* oldNode = Graph.GetPartition(TThis::TableHelper.PartitionId().value()); + + if (!oldNode) { + return TThis::ReplyError(ErrorCode::ERROR, TStringBuilder() << "Inconsistent status Marker# PC03", ctx); + } + + TThis::PartitionHelper.Open(oldNode->TabletId, ctx); + TThis::PartitionHelper.SendMaxSeqNoRequest(oldNode->Id, TThis::SourceId, ctx); + } + + void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; + + TString error; + if (!BasicCheck(record, error)) { + return TThis::ReplyError(ErrorCode::INITIALIZING, std::move(error), ctx); + } + + const auto& response = record.GetPartitionResponse(); + if (!response.HasCmdGetMaxSeqNoResult()) { + return TThis::ReplyError(ErrorCode::INITIALIZING, "Absent MaxSeqNo result", ctx); + } + + const auto& result = response.GetCmdGetMaxSeqNoResult(); + if (result.SourceIdInfoSize() < 1) { + return TThis::ReplyError(ErrorCode::INITIALIZING, "Empty source id info", ctx); + } + + const auto& sourceIdInfo = result.GetSourceIdInfo(0); + switch (sourceIdInfo.GetState()) { + case NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED: + TThis::SeqNo = sourceIdInfo.GetSeqNo(); + break; + case NKikimrPQ::TMessageGroupInfo::STATE_PENDING_REGISTRATION: + case NKikimrPQ::TMessageGroupInfo::STATE_UNKNOWN: + TThis::SeqNo = TThis::TableHelper.SeqNo(); + break; + } + + TThis::PartitionHelper.Close(ctx); + TThis::StartCheckPartitionRequest(ctx); + } + + STATEFN(StateGetMaxSeqNo) { + switch (ev->GetTypeRewrite()) { + HFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo); + SFunc(TEvents::TEvPoison, TThis::Die); + HFunc(TEvTabletPipe::TEvClientConnected, TThis::HandleOwnership); + HFunc(TEvTabletPipe::TEvClientDestroyed, TThis::HandleOwnership); + } + } + + +private: + void OnPartitionChosen(const TActorContext& ctx) { + TRACE("OnPartitionChosen"); + + if (!TThis::Partition && TThis::PreferedPartition) { + return TThis::ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "Prefered partition " << (TThis::PreferedPartition.value() + 1) << " is not exists or inactive.", + ctx); + } + + if (!TThis::Partition) { + return TThis::ReplyError(ErrorCode::INITIALIZING, "Can't choose partition", ctx); + } + + if (TThis::PreferedPartition && TThis::Partition->PartitionId != TThis::PreferedPartition.value()) { + return TThis::ReplyError(ErrorCode::BAD_REQUEST, + TStringBuilder() << "MessageGroupId " << TThis::SourceId << " is already bound to PartitionGroupId " + << (TThis::Partition->PartitionId + 1) << ", but client provided " << (TThis::PreferedPartition.value() + 1) + << ". MessageGroupId->PartitionGroupId binding cannot be changed, either use " + "another MessageGroupId, specify PartitionGroupId " << (TThis::Partition->PartitionId + 1) + << ", or do not specify PartitionGroupId at all.", + ctx); + } + + TThis::StartCheckPartitionRequest(ctx); + } + + const TPartitionInfo* ChoosePartitionSync() const { + if (TThis::PreferedPartition) { + return TThis::Chooser->GetPartition(TThis::PreferedPartition.value()); + } else if (TThis::SourceId) { + return TThis::Chooser->GetPartition(TThis::SourceId); + } else { + return TThis::Chooser->GetRandomPartition(); + } + }; + + +private: + const TPartitionInfo* BoundaryPartition = nullptr; + const TPartitionGraph Graph; +}; + +#undef LOG_PREFIX +#undef TRACE +#undef DEBUG +#undef INFO +#undef ERROR + +} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h b/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h new file mode 100644 index 000000000000..3dee14a904c2 --- /dev/null +++ b/ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h @@ -0,0 +1,279 @@ +#pragma once + +#include "metadata_initializers.h" +#include "source_id_encoding.h" + +#include +#include +#include +#include +#include +#include + + +namespace NKikimr::NPQ::NPartitionChooser { + +#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) +#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" +#endif + + +#define LOG_PREFIX "TTableHelper " +#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); +#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_PARTITION_CHOOSER, LOG_PREFIX << message); + + +class TTableHelper { +public: + TTableHelper(const TString& topicName, const TString& topicHashName) + : TopicName(topicName) + , TopicHashName(topicHashName) { + }; + + std::optional PartitionId() const { + return PartitionId_; + } + + std::optional SeqNo() const { + return SeqNo_; + } + + bool Initialize(const TActorContext& ctx, const TString& sourceId) { + const auto& pqConfig = AppData(ctx)->PQConfig; + + TableGeneration = pqConfig.GetTopicsAreFirstClassCitizen() ? ESourceIdTableGeneration::PartitionMapping + : ESourceIdTableGeneration::SrcIdMeta2; + try { + EncodedSourceId = NSourceIdEncoding::EncodeSrcId( + TopicHashName, sourceId, TableGeneration + ); + } catch (yexception& e) { + return false; + } + + SelectQuery = GetSelectSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); + UpdateQuery = GetUpdateSourceIdQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); + UpdateAccessTimeQuery = GetUpdateAccessTimeQueryFromPath(pqConfig.GetSourceIdTablePath(), TableGeneration); + + DEBUG("SelectQuery: " << SelectQuery); + DEBUG("UpdateQuery: " << UpdateQuery); + DEBUG("UpdateAccessTimeQuery: " << UpdateAccessTimeQuery); + + return true; + } + + TString GetDatabaseName(const TActorContext& ctx) { + const auto& pqConfig = AppData(ctx)->PQConfig; + switch (TableGeneration) { + case ESourceIdTableGeneration::SrcIdMeta2: + return NKikimr::NPQ::GetDatabaseFromConfig(pqConfig); + case ESourceIdTableGeneration::PartitionMapping: + return AppData(ctx)->TenantName; + } + } + + void SendInitTableRequest(const TActorContext& ctx) { + ctx.Send( + NMetadata::NProvider::MakeServiceId(ctx.SelfID.NodeId()), + new NMetadata::NProvider::TEvPrepareManager(NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()) + ); + } + + void SendCreateSessionRequest(const TActorContext& ctx) { + auto ev = MakeCreateSessionRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + } + + THolder MakeCreateSessionRequest(const TActorContext& ctx) { + auto ev = MakeHolder(); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + return ev; + } + + bool Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& /*ctx*/) { + const auto& record = ev->Get()->Record; + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return false; + } + + KqpSessionId = record.GetResponse().GetSessionId(); + Y_ABORT_UNLESS(!KqpSessionId.empty()); + + return true; + } + + void CloseKqpSession(const TActorContext& ctx) { + if (KqpSessionId) { + auto ev = MakeCloseSessionRequest(); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + + KqpSessionId = ""; + } + } + + THolder MakeCloseSessionRequest() { + auto ev = MakeHolder(); + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + return ev; + } + + void SendSelectRequest(const TActorContext& ctx) { + auto ev = MakeSelectQueryRequest(ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + } + + THolder MakeSelectQueryRequest(const NActors::TActorContext& ctx) { + auto ev = MakeHolder(); + + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + ev->Record.MutableRequest()->SetQuery(SelectQuery); + + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + // fill tx settings: set commit tx flag& begin new serializable tx. + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false); + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + // keep compiled query in cache. + ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(TopicName) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); + + return ev; + } + + bool HandleSelect(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& /*ctx*/) { + auto& record = ev->Get()->Record.GetRef(); + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return false; + } + + auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0); + + TxId = record.GetResponse().GetTxMeta().id(); + Y_ABORT_UNLESS(!TxId.empty()); + + if (t.ListSize() != 0) { + auto& list = t.GetList(0); + auto& tt = list.GetStruct(0); + if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition + auto accessTime = list.GetStruct(2).GetOptional().GetUint64(); + if (accessTime > AccessTime) { // AccessTime + PartitionId_ = tt.GetOptional().GetUint32(); + CreateTime = list.GetStruct(1).GetOptional().GetUint64(); + AccessTime = accessTime; + SeqNo_ = list.GetStruct(3).GetOptional().GetUint64(); + } + } + } + + if (CreateTime == 0) { + CreateTime = TInstant::Now().MilliSeconds(); + } + + return true; + } + + void SendUpdateRequest(ui32 partitionId, std::optional seqNo, const TActorContext& ctx) { + auto ev = MakeUpdateQueryRequest(partitionId, seqNo, ctx); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + } + + THolder MakeUpdateQueryRequest(ui32 partitionId, std::optional seqNo, const NActors::TActorContext& ctx) { + auto ev = MakeHolder(); + + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + ev->Record.MutableRequest()->SetQuery(TxId ? UpdateQuery : UpdateAccessTimeQuery); + ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx)); + // fill tx settings: set commit tx flag& begin new serializable tx. + ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + if (KqpSessionId) { + ev->Record.MutableRequest()->SetSessionId(KqpSessionId); + } + if (TxId) { + ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(TxId); + TxId = ""; + } else { + ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + } + // keep compiled query in cache. + ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + + NYdb::TParamsBuilder paramsBuilder = NYdb::TParamsBuilder(); + + SetHashToTParamsBuilder(paramsBuilder, EncodedSourceId); + + paramsBuilder + .AddParam("$Topic") + .Utf8(TopicName) + .Build() + .AddParam("$SourceId") + .Utf8(EncodedSourceId.EscapedSourceId) + .Build() + .AddParam("$CreateTime") + .Uint64(CreateTime) + .Build() + .AddParam("$AccessTime") + .Uint64(TInstant::Now().MilliSeconds()) + .Build() + .AddParam("$SeqNo") + .Uint64(seqNo.value_or(0)) + .Build() + .AddParam("$Partition") + .Uint32(partitionId) + .Build(); + + NYdb::TParams params = paramsBuilder.Build(); + + ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(params))); + + return ev; + } + +private: + const TString TopicName; + const TString TopicHashName; + + NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId; + + NPQ::ESourceIdTableGeneration TableGeneration; + TString SelectQuery; + TString UpdateQuery; + TString UpdateAccessTimeQuery; + + TString KqpSessionId; + TString TxId; + + ui64 CreateTime = 0; + ui64 AccessTime = 0; + + std::optional PartitionId_; + std::optional SeqNo_; +}; + +#undef LOG_PREFIX +#undef TRACE +#undef DEBUG +#undef INFO +#undef ERROR + +} // namespace NKikimr::NPQ::NPartitionChooser diff --git a/ydb/core/persqueue/writer/pipe_utils.h b/ydb/core/persqueue/writer/pipe_utils.h new file mode 100644 index 000000000000..db78a3ce123d --- /dev/null +++ b/ydb/core/persqueue/writer/pipe_utils.h @@ -0,0 +1,76 @@ +#pragma once + +#include +#include + +#include + +namespace NKikimr::NTabletPipe { + +class TPipeHelper { +public: + static IActor* CreateClient(const TActorId& owner, ui64 tabletId, const TClientConfig& config = TClientConfig()) { + return NKikimr::NTabletPipe::CreateClient(owner, tabletId, config); + } + + static void SendData(const TActorContext& ctx, const TActorId& clientId, IEventBase* payload, ui64 cookie = 0, NWilson::TTraceId traceId = {}) { + return NKikimr::NTabletPipe::SendData(ctx, clientId, payload, cookie, std::move(traceId)); + } +}; + +namespace NTest { + + class TPipeMock { + private: + class TPipeActorMock: public TActorBootstrapped { + public: + TPipeActorMock(const TActorId& clientId, ui64 tabletId, const TActorId& forwardTo) + : ClientId(clientId) + , TabletId(tabletId) + , ForwardTo(forwardTo) {} + + void Bootstrap(const TActorContext& ctx) { + if (ForwardTo) { + ctx.Send(ForwardTo, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::EReplyStatus::OK, ClientId, ClientId, true, false, 1)); + Become(&TPipeActorMock::StateForward); + } else { + ctx.Send(ForwardTo, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::EReplyStatus::ERROR, ClientId, ClientId, true, false, 0)); + Die(ctx); + } + } + + private: + STFUNC(StateForward) { + auto ctx = TActivationContext::ActorContextFor(TActor::SelfId()); + ctx.Forward(ev, ForwardTo); + } + + private: + TActorId ClientId; + ui64 TabletId; + TActorId ForwardTo; + }; + + public: + static IActor* CreateClient(const TActorId& owner, ui64 tabletId, const TClientConfig& config = TClientConfig()) { + Y_UNUSED(config); + + auto it = Tablets.find(tabletId); + auto actorId = it == Tablets.end() ? TActorId() : it->second; + return new TPipeActorMock(owner, tabletId, actorId); + } + + static void Clear() { + Tablets.clear(); + } + + static void Register(ui64 tabletId, const TActorId& actorId) { + Tablets[tabletId] = actorId; + } + private: + static std::unordered_map Tablets; + }; + +} // namespace NTest + +} // namespace NKikimr::NTabletPipe diff --git a/ydb/core/persqueue/writer/source_id_encoding.cpp b/ydb/core/persqueue/writer/source_id_encoding.cpp index b4f4af39d544..6773cb58937e 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.cpp +++ b/ydb/core/persqueue/writer/source_id_encoding.cpp @@ -20,15 +20,15 @@ TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera return TStringBuilder() << "--!syntax_v1\n" "DECLARE $Hash AS Uint32; " "DECLARE $Topic AS Utf8; " - "DECLARE $SourceId AS Utf8; " - "SELECT Partition, CreateTime, AccessTime FROM `" << path << "` " + "DECLARE $SourceId AS Utf8;\n" + "SELECT Partition, CreateTime, AccessTime, SeqNo FROM `" << path << "` " "WHERE Hash == $Hash AND Topic == $Topic AND SourceId == $SourceId;"; case ESourceIdTableGeneration::PartitionMapping: return TStringBuilder() << "--!syntax_v1\n" "DECLARE $Hash AS Uint64; " "DECLARE $Topic AS Utf8; " - "DECLARE $SourceId AS Utf8; " - "SELECT Partition, CreateTime, AccessTime FROM `" + "DECLARE $SourceId AS Utf8;\n" + "SELECT Partition, CreateTime, AccessTime, SeqNo FROM `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() << "` WHERE Hash == $Hash AND Topic == $Topic AND ProducerId == $SourceId;"; default: @@ -51,6 +51,36 @@ TString GetSelectSourceIdQuery(const TString& root, ESourceIdTableGeneration gen } TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration generation) { + switch (generation) { + case ESourceIdTableGeneration::SrcIdMeta2: + return TStringBuilder() << "--!syntax_v1\n" + "DECLARE $SourceId AS Utf8; " + "DECLARE $Topic AS Utf8; " + "DECLARE $Hash AS Uint32; " + "DECLARE $Partition AS Uint32; " + "DECLARE $CreateTime AS Uint64; " + "DECLARE $AccessTime AS Uint64;" + "DECLARE $SeqNo AS Uint64;\n" + "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition, SeqNo) VALUES " + "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);"; + case ESourceIdTableGeneration::PartitionMapping: + return TStringBuilder() << "--!syntax_v1\n" + "DECLARE $SourceId AS Utf8; " + "DECLARE $Topic AS Utf8; " + "DECLARE $Hash AS Uint64; " + "DECLARE $Partition AS Uint32; " + "DECLARE $CreateTime AS Uint64; " + "DECLARE $AccessTime AS Uint64; " + "DECLARE $SeqNo AS Uint64;\n" + "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() + << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition, SeqNo) VALUES " + "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition, $SeqNo);"; + default: + Y_ABORT(); + } +} + +TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGeneration generation) { switch (generation) { case ESourceIdTableGeneration::SrcIdMeta2: return TStringBuilder() << "--!syntax_v1\n" @@ -60,8 +90,9 @@ TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera "DECLARE $Partition AS Uint32; " "DECLARE $CreateTime AS Uint64; " "DECLARE $AccessTime AS Uint64;\n" - "UPSERT INTO `" << path << "` (Hash, Topic, SourceId, CreateTime, AccessTime, Partition) VALUES " - "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; + "UPDATE `" << path << "` " + "SET AccessTime = $AccessTime " + "WHERE Hash = $Hash AND Topic = $Topic AND SourceId = $SourceId AND Partition = $Partition;"; case ESourceIdTableGeneration::PartitionMapping: return TStringBuilder() << "--!syntax_v1\n" "DECLARE $SourceId AS Utf8; " @@ -70,9 +101,9 @@ TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGenera "DECLARE $Partition AS Uint32; " "DECLARE $CreateTime AS Uint64; " "DECLARE $AccessTime AS Uint64;\n" - "UPSERT INTO `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() - << "` (Hash, Topic, ProducerId, CreateTime, AccessTime, Partition) VALUES " - "($Hash, $Topic, $SourceId, $CreateTime, $AccessTime, $Partition);"; + "UPDATE `" << NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath() << "` " + "SET AccessTime = $AccessTime " + "WHERE Hash = $Hash AND Topic = $Topic AND ProducerId = $SourceId AND Partition = $Partition;"; default: Y_ABORT(); } @@ -92,6 +123,20 @@ TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration gen } } +TString GetUpdateAccessTimeQuery(const TString& root, ESourceIdTableGeneration generation) { + switch (generation) { + case ESourceIdTableGeneration::SrcIdMeta2: + return GetUpdateAccessTimeQueryFromPath(root + "/SourceIdMeta2", generation); + case ESourceIdTableGeneration::PartitionMapping: + return GetUpdateAccessTimeQueryFromPath( + NGRpcProxy::V1::TSrcIdMetaInitManager::GetInstant()->GetStorageTablePath(), + generation + ); + default: + Y_ABORT(); + } +} + namespace NSourceIdEncoding { static const ui32 MURMUR_ARRAY_SEED = 0x9747b28c; diff --git a/ydb/core/persqueue/writer/source_id_encoding.h b/ydb/core/persqueue/writer/source_id_encoding.h index 99cdef19f65b..39b648786427 100644 --- a/ydb/core/persqueue/writer/source_id_encoding.h +++ b/ydb/core/persqueue/writer/source_id_encoding.h @@ -15,9 +15,11 @@ enum class ESourceIdTableGeneration { TString GetSelectSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); TString GetUpdateSourceIdQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); +TString GetUpdateAccessTimeQuery(const TString& root, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); TString GetSelectSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); TString GetUpdateSourceIdQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); +TString GetUpdateAccessTimeQueryFromPath(const TString& path, ESourceIdTableGeneration = ESourceIdTableGeneration::SrcIdMeta2); namespace NSourceIdEncoding { diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 1cc4ab2e9bc8..dc5bddf72ee0 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -1,3 +1,4 @@ +#include "common.h" #include "source_id_encoding.h" #include "util/generic/fwd.h" #include "writer.h" @@ -129,27 +130,6 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return ev; } - static bool BasicCheck(const NKikimrClient::TResponse& response, TString& error, bool mustHaveResponse = true) { - if (response.GetStatus() != NMsgBusProxy::MSTATUS_OK) { - error = TStringBuilder() << "Status is not ok" - << ": status# " << static_cast(response.GetStatus()); - return false; - } - - if (response.GetErrorCode() != NPersQueue::NErrorCode::OK) { - error = TStringBuilder() << "Error code is not ok" - << ": code# " << static_cast(response.GetErrorCode()); - return false; - } - - if (mustHaveResponse && !response.HasPartitionResponse()) { - error = "Absent partition response"; - return false; - } - - return true; - } - static NKikimrClient::TResponse MakeResponse(ui64 cookie) { NKikimrClient::TResponse response; response.MutablePartitionResponse()->SetCookie(cookie); @@ -326,6 +306,10 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return InitResult("Absent Ownership result", std::move(record)); } + if (NKikimrPQ::ETopicPartitionStatus::Active != response.GetCmdGetOwnershipResult().GetStatus()) { + return InitResult("Partition is inactive", std::move(record)); + } + OwnerCookie = response.GetCmdGetOwnershipResult().GetOwnerCookie(); GetMaxSeqNo(); } diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index 8ab13eff7797..e92ea1d7b8e5 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -110,6 +110,7 @@ message TPersQueuePartitionRequest { message TCmdGetOwnership { // get write ownership for partition optional string Owner = 1 [default = "default"]; optional bool Force = 2 [ default = true]; + optional bool RegisterIfNotExists = 3 [default = true]; } message TCmdReserveBytes { @@ -468,6 +469,8 @@ message TPersQueuePartitionResponse { message TCmdGetOwnershipResult { optional string OwnerCookie = 1; + optional NKikimrPQ.ETopicPartitionStatus Status = 2; + optional int64 SeqNo = 3; } message TCmdPrepareDirectReadResult { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 6abc3855e4f7..b47d3ff20ac4 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -962,6 +962,13 @@ message TEvSourceIdResponse { repeated TSource Source = 3; }; +message TEvCheckPartitionStatusRequest { + optional uint32 Partition = 1; +}; + +message TEvCheckPartitionStatusResponse { + optional ETopicPartitionStatus Status = 1; +}; message TTransaction { enum EKind { diff --git a/ydb/core/testlib/basics/ya.make b/ydb/core/testlib/basics/ya.make index 8d0f9802a9b1..642640447975 100644 --- a/ydb/core/testlib/basics/ya.make +++ b/ydb/core/testlib/basics/ya.make @@ -28,6 +28,9 @@ PEERDIR( ydb/core/util ydb/library/yql/minikql/invoke_builtins/llvm14 ydb/library/yql/public/udf/service/exception_policy + ydb/services/kesus + ydb/services/persqueue_cluster_discovery + ydb/services/ydb ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index 717d653260f5..798795f3317b 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -599,6 +599,7 @@ class TFlatMsgBusPQClient : public NFlatTests::TFlatMsgBusClient { "Columns { Name: \"Partition\" Type: \"Uint32\"}" "Columns { Name: \"CreateTime\" Type: \"Uint64\"}" "Columns { Name: \"AccessTime\" Type: \"Uint64\"}" + "Columns { Name: \"SeqNo\" Type: \"Uint64\"}" "KeyColumnNames: [\"Hash\", \"SourceId\", \"Topic\"]" ); } diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make index 47c6c964d86d..e242c0b6b57b 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ya.make +++ b/ydb/core/tx/columnshard/splitter/ut/ya.make @@ -20,6 +20,9 @@ PEERDIR( ydb/library/yql/minikql/comp_nodes/llvm14 ydb/library/yql/public/udf/service/exception_policy ydb/library/yql/sql/pg + ydb/services/kesus + ydb/services/persqueue_cluster_discovery + ydb/services/ydb ) ADDINCL(