Skip to content

Commit

Permalink
Choose partition for topic split/merge (ydb-platform#1268)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Jan 25, 2024
1 parent a880602 commit c3a77e3
Show file tree
Hide file tree
Showing 37 changed files with 2,134 additions and 761 deletions.
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
UNITTEST_FOR(ydb/core/kafka_proxy)

#SIZE(medium)
SIZE(medium)

SRCS(
ut_kafka_functions.cpp
Expand Down
18 changes: 17 additions & 1 deletion ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ struct TEvPQ {
EvCacheProxyForgetRead,
EvGetFullDirectReadData,
EvProvideDirectReadInfo,
EvCheckPartitionStatusRequest,
EvCheckPartitionStatusResponse,
EvEnd
};

Expand Down Expand Up @@ -491,19 +493,21 @@ struct TEvPQ {
};

struct TEvChangeOwner : public TEventLocal<TEvChangeOwner, EvChangeOwner> {
explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force)
explicit TEvChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const TActorId& sender, const bool force, const bool registerIfNotExists = true)
: Cookie(cookie)
, Owner(owner)
, PipeClient(pipeClient)
, Sender(sender)
, Force(force)
, RegisterIfNotExists(registerIfNotExists)
{}

ui64 Cookie;
TString Owner;
TActorId PipeClient;
TActorId Sender;
bool Force;
bool RegisterIfNotExists;
};

struct TEvPipeDisconnected : public TEventLocal<TEvPipeDisconnected, EvPipeDisconnected> {
Expand Down Expand Up @@ -989,6 +993,18 @@ struct TEvPQ {
struct TEvProvideDirectReadInfo : public TEventLocal<TEvProvideDirectReadInfo, EvProvideDirectReadInfo> {
};

struct TEvCheckPartitionStatusRequest : public TEventPB<TEvCheckPartitionStatusRequest, NKikimrPQ::TEvCheckPartitionStatusRequest, EvCheckPartitionStatusRequest> {
TEvCheckPartitionStatusRequest() = default;

TEvCheckPartitionStatusRequest(ui32 partitionId) {
Record.SetPartition(partitionId);
}
};

struct TEvCheckPartitionStatusResponse : public TEventPB<TEvCheckPartitionStatusResponse, NKikimrPQ::TEvCheckPartitionStatusResponse, EvCheckPartitionStatusResponse> {
};


};

} //NKikimr
21 changes: 20 additions & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@ void TPartition::EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& conf
{
Config = config;
PartitionConfig = GetPartitionConfig(Config, Partition);
PartitionGraph.Rebuild(Config);
PartitionGraph = MakePartitionGraph(Config);
TopicConverter = topicConverter;
NewPartition = false;

Expand Down Expand Up @@ -2616,4 +2616,23 @@ void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext
Send(ev->Sender, response.Release());
}

void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

if (Partition != record.GetPartition()) {
LOG_INFO_S(
ctx, NKikimrServices::PERSQUEUE,
"TEvCheckPartitionStatusRequest for wrong partition " << record.GetPartition() << "." <<
" Topic: \"" << TopicName() << "\"." <<
" Partition: " << Partition << "."
);
return;
}

auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);

Send(ev->Sender, response.Release());
}

} // namespace NKikimr::NPQ
5 changes: 4 additions & 1 deletion ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class TPartition : public TActorBootstrapped<TPartition> {

void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
void ReplyOk(const TActorContext& ctx, const ui64 dst);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);

void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);

Expand Down Expand Up @@ -344,6 +344,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
// void DestroyReadSession(const TReadSessionKey& key);

void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);

TString LogPrefix() const;

Expand Down Expand Up @@ -481,6 +482,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
Expand Down Expand Up @@ -538,6 +540,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon
case NKikimrProto::NODATA:
Partition()->Config = Partition()->TabletConfig;
Partition()->PartitionConfig = GetPartitionConfig(Partition()->Config, Partition()->Partition);
Partition()->PartitionGraph.Rebuild(Partition()->Config);
Partition()->PartitionGraph = MakePartitionGraph(Partition()->Config);
break;

case NKikimrProto::ERROR:
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/persqueue/partition_sourcemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace NKikimr::NPQ {

IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId partition, ui64 tabletId);
bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node);
bool IsResearchRequires(const TPartitionGraph::Node* node);

//
// TPartitionSourceManager
Expand Down Expand Up @@ -37,7 +37,7 @@ void TPartitionSourceManager::ScheduleBatch() {

PendingSourceIds = std::move(UnknownSourceIds);

for(const auto* parent : node.value()->HierarhicalParents) {
for(const auto* parent : node->HierarhicalParents) {
PendingCookies.insert(++Cookie);

TActorId actorId = PartitionRequester(parent->Id, parent->TabletId);
Expand Down Expand Up @@ -141,7 +141,7 @@ void TPartitionSourceManager::Handle(TEvPQ::TEvSourceIdResponse::TPtr& ev, const
}
}

TPartitionSourceManager::TPartitionNode TPartitionSourceManager::GetPartitionNode() const {
const TPartitionSourceManager::TPartitionNode* TPartitionSourceManager::GetPartitionNode() const {
return Partition.PartitionGraph.GetPartition(Partition.Partition);
}

Expand Down Expand Up @@ -185,7 +185,7 @@ TSourceIdStorage& TPartitionSourceManager::GetSourceIdStorage() const {

bool TPartitionSourceManager::HasParents() const {
auto node = Partition.PartitionGraph.GetPartition(Partition.Partition);
return node && !node.value()->Parents.empty();
return node && !node->Parents.empty();
}

TActorId TPartitionSourceManager::PartitionRequester(TPartitionId id, ui64 tabletId) {
Expand Down Expand Up @@ -484,8 +484,8 @@ IActor* CreateRequester(TActorId parent, TPartitionSourceManager::TPartitionId p
return new TSourceIdRequester(parent, partition, tabletId);
}

bool IsResearchRequires(std::optional<const TPartitionGraph::Node*> node) {
return node && !node.value()->Parents.empty();
bool IsResearchRequires(const TPartitionGraph::Node* node) {
return node && !node->Parents.empty();
}

NKikimrPQ::TEvSourceIdResponse::EState Convert(TSourceIdInfo::EState value) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/partition_sourcemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TPartition;

class TPartitionSourceManager {
private:
using TPartitionNode = std::optional<const TPartitionGraph::Node *>;
using TPartitionNode = TPartitionGraph::Node;

public:
using TPartitionId = ui32;
Expand Down Expand Up @@ -96,7 +96,7 @@ class TPartitionSourceManager {
private:
TPartitionSourceManager& Manager;

TPartitionNode Node;
const TPartitionNode* Node;
TSourceIdWriter SourceIdWriter;
THeartbeatEmitter HeartbeatEmitter;
};
Expand Down Expand Up @@ -125,7 +125,7 @@ class TPartitionSourceManager {
void FinishBatch(const TActorContext& ctx);
bool RequireEnqueue(const TString& sourceId);

TPartitionNode GetPartitionNode() const;
const TPartitionNode* GetPartitionNode() const;
TSourceIdStorage& GetSourceIdStorage() const;
bool HasParents() const;

Expand Down
23 changes: 17 additions & 6 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ static const ui32 MAX_INLINE_SIZE = 1000;

static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;

void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie) {
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);

THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
NKikimrClient::TResponse& resp = *response->Response;
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult()->SetOwnerCookie(cookie);
auto* r = resp.MutablePartitionResponse()->MutableCmdGetOwnershipResult();
r->SetOwnerCookie(cookie);
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
r->SetSeqNo(seqNo);

ctx.Send(Tablet, response.Release());
}

Expand Down Expand Up @@ -146,8 +150,12 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
auto &owner = ev->Owner;
auto it = Owners.find(owner);
if (it == Owners.end()) {
Owners[owner];
it = Owners.find(owner);
if (ev->RegisterIfNotExists) {
Owners[owner];
it = Owners.find(owner);
} else {
return ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED, "SourceId isn't registered");
}
}
if (it->second.NeedResetOwner || ev->Force) { //change owner
Y_ABORT_UNLESS(ReservedSize >= it->second.ReservedSize);
Expand Down Expand Up @@ -346,10 +354,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
if (!already && partNo + 1 == totalParts && !writeResponse.Msg.HeartbeatVersion)
++offset;
} else if (response.IsOwnership()) {
const TString& ownerCookie = response.GetOwnership().OwnerCookie;
const auto& r = response.GetOwnership();
const TString& ownerCookie = r.OwnerCookie;
auto it = Owners.find(TOwnerInfo::GetOwnerFromOwnerCookie(ownerCookie));
if (it != Owners.end() && it->second.OwnerCookie == ownerCookie) {
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie);
auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(it->first));
auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds().end() ? 0 : sit->second.SeqNo;
ReplyOwnerOk(ctx, response.GetCookie(), ownerCookie, seqNo);
} else {
ReplyError(ctx, response.GetCookie(), NPersQueue::NErrorCode::WRONG_COOKIE, "new GetOwnership request is dropped already");
}
Expand Down
35 changes: 34 additions & 1 deletion ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
}

ProcessSourceIdRequests(partitionId);
ProcessCheckPartitionStatusRequests(partitionId);
if (allInitialized) {
SourceIdRequests.clear();
}
Expand Down Expand Up @@ -2048,7 +2049,8 @@ void TPersQueue::HandleGetOwnershipRequest(const ui64 responseCookie, const TAct
it->second = TPipeInfo::ForOwner(partActor, owner, it->second.ServerActors);

InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_GET_OWNERSHIP);
THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender, req.GetCmdGetOwnership().GetForce());
THolder<TEvPQ::TEvChangeOwner> event = MakeHolder<TEvPQ::TEvChangeOwner>(responseCookie, owner, pipeClient, sender,
req.GetCmdGetOwnership().GetForce(), req.GetCmdGetOwnership().GetRegisterIfNotExists());
ctx.Send(partActor, event.Release());
}

Expand Down Expand Up @@ -3915,6 +3917,37 @@ void TPersQueue::ProcessSourceIdRequests(ui32 partitionId) {
}
}

void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
auto it = Partitions.find(record.GetPartition());
if (it == Partitions.end()) {
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Unknown partition " << record.GetPartition());

auto response = THolder<TEvPQ::TEvCheckPartitionStatusResponse>();
response->Record.SetStatus(NKikimrPQ::ETopicPartitionStatus::Deleted);
Send(ev->Sender, response.Release());

return;
}

if (it->second.InitDone) {
Forward(ev, it->second.Actor);
} else {
CheckPartitionStatusRequests[record.GetPartition()].push_back(ev);
}
}

void TPersQueue::ProcessCheckPartitionStatusRequests(ui32 partitionId) {
auto sit = CheckPartitionStatusRequests.find(partitionId);
if (sit != CheckPartitionStatusRequests.end()) {
auto it = Partitions.find(partitionId);
for (auto& r : sit->second) {
Forward(r, it->second.Actor);
}
CheckPartitionStatusRequests.erase(partitionId);
}
}

TString TPersQueue::LogPrefix() const {
return TStringBuilder() << SelfId() << " ";
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
void ProcessSourceIdRequests(ui32 partitionId);

void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
void ProcessCheckPartitionStatusRequests(ui32 partitionId);

TString LogPrefix() const;

static constexpr const char * KeyConfig() { return "_config"; }
Expand Down Expand Up @@ -405,6 +408,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
bool UseMediatorTimeCast = true;

THashMap<ui32, TVector<TEvPQ::TEvSourceIdRequest::TPtr>> SourceIdRequests;
THashMap<ui32, TVector<TEvPQ::TEvCheckPartitionStatusRequest::TPtr>> CheckPartitionStatusRequests;
TMaybe<ui64> TabletGeneration;
};

Expand Down
9 changes: 4 additions & 5 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,24 +147,23 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TConfigTrans
TabletConfig = txBody.GetTabletConfig();
BootstrapConfig = txBody.GetBootstrapConfig();

TPartitionGraph graph;
graph.Rebuild(TabletConfig);
TPartitionGraph graph = MakePartitionGraph(TabletConfig);

for (const auto& p : TabletConfig.GetPartitions()) {
auto node = graph.GetPartition(p.GetPartitionId());
if (!node) {
// Old configuration format without AllPartitions. Split/Merge is not supported.
continue;
}
if (node.value()->Children.empty()) {
for (const auto* r : node.value()->Parents) {
if (node->Children.empty()) {
for (const auto* r : node->Parents) {
if (extractTabletId != r->TabletId) {
Senders.insert(r->TabletId);
}
}
}

for (const auto* r : node.value()->Children) {
for (const auto* r : node->Children) {
if (r->Children.empty()) {
if (extractTabletId != r->TabletId) {
Receivers.insert(r->TabletId);
Expand Down
Loading

0 comments on commit c3a77e3

Please sign in to comment.