-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka connect distributed #12960
base: main
Are you sure you want to change the base?
Kafka connect distributed #12960
Conversation
⚪ Test history | Ya make output | Test bloat
🟢 |
⚪ Test history | Ya make output | Test bloat
🟢 |
⚪ Test history | Ya make output | Test bloat
🟢 |
⚪ Test history | Ya make output | Test bloat
🟢 |
|
||
|
||
void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { | ||
const TString createSessionError = "Failed to create KQP session"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get error text also from Issues inside response.
bool TKqpTxHelper::HandleCreateSessionResponse(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext&) { | ||
const auto& record = ev->Get()->Record; | ||
|
||
if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add logs here
void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { | ||
const TString kqpQueryError = "KQP query error"; | ||
if (ev->Cookie != KqpReqCookie) { | ||
KAFKA_LOG_CRIT("Unexpected cookie in TEvQueryResponse"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really crit? Is this situation possible?
|
||
const auto& record = ev->Get()->Record; | ||
auto status = record.GetYdbStatus(); | ||
auto kafkaErr = KqpStatusToKafkaError(status); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fill kqpQueryError from status issues and status
if (kafkaErr != EKafkaErrors::NONE_ERROR) { | ||
switch (RequestType) { | ||
case JOIN_GROUP: | ||
SendJoinGroupResponseFail(ctx, CorellationId, kafkaErr, kqpQueryError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor this code. Make SendResponseFail() function and switch with RequestType inside. Otherwise there is copy-paste.
LIMIT 1; | ||
)"; | ||
|
||
const TString INSERT_MEMBER_AND_SELECT_MASTER = R"( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In code you using INSERT_MEMBER_AND_SELECT_MASTER_QUERY. No _QUERY suffix here.
And you should check that visibility of transaction changes is enabled. What if min join-time in this request?
$Generation, | ||
$MemberId, | ||
CurrentUtcDateTime(), | ||
CurrentUtcDateTime() + Interval("PT5S"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WTF? Make all dates parameters. No 5 seconds in YQL.
if (IsMaster) { | ||
auto wakeup = std::make_unique<TEvents::TEvWakeup>(0); | ||
ctx.ActorSystem()->Schedule( | ||
TDuration::Seconds(WAKE_UP_DELAY_SECONDS), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why constant? Get this from client.
|
||
void TKafkaBalancerActor::Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx) { | ||
const TString createSessionError = "Failed to create KQP session"; | ||
if (!Kqp->HandleCreateSessionResponse(ev, ctx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if Error is TLI may be you should restart action from begin? However response with error is possible solution. Make an issue for this.
🔴 Unable to merge your PR into the base branch. Please rebase or merge it with the base branch. |
DECLARE $Database AS Utf8; | ||
DECLARE $Master AS Utf8; | ||
|
||
INSERT INTO `/Root/.metadata/kafka_consumer_groups` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change /Root to correct database or remove it. Check how this is done in WriteSession in choosing partition for write.
$Generation, | ||
$State, | ||
$Database, | ||
CurrentUtcDateTime(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move Timestamp into parameters please.
SET | ||
state = $State, | ||
generation = $Generation, | ||
last_heartbeat_time = CurrentUtcDateTime(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All values must be in parameters.
if (status == Ydb::StatusIds::SUCCESS) { | ||
return EKafkaErrors::NONE_ERROR; | ||
} | ||
return EKafkaErrors::UNKNOWN_SERVER_ERROR; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А тут еще будут доработки? Или такие ошибки невозможно протранслировать в какие-то конкретные ощибки кафки?
|
||
void TKafkaBalancerActor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr&, const TActorContext& ctx) { | ||
TablesInited++; | ||
if (TablesInited == 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic number here. Please change to constant value to improve code readability.
HandleHeartbeatResponse(ev, ctx); | ||
break; | ||
default: | ||
KAFKA_LOG_CRIT("Unknown RequestType in TEvCreateSessionResponse"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be we should add this log line in every other switch case? To catch a situation, when we add new request type but forget to add it in some switch clause.
auto& record = ev->Get()->Record; | ||
auto& resp = record.GetResponse(); | ||
if (resp.GetYdbResults().empty()) { | ||
outGroupExists = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be return complex object with outGroupExists, outGeneration, outState, outMasterId and outTtl fields instead of affecting arguments? I feel like it is an antipattern (in java sure it is, but in c++ i'm not sure). Ready to discuss)
This new returning struct can have some meaningful name like GroupStatus (if i understood code correctly). It will vastly improve readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found myself searching through code for what does state
variable mean in this context and I couldn't find it. This new domain object (say GroupStatus) could also have comments for every field to simplify code search.
|
||
bool TKafkaBalancerActor::ParseAssignments( | ||
NKqp::TEvKqp::TEvQueryResponse::TPtr ev, | ||
TString& assignments) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same. I'd suggest returning this assginemnts string as a return value of this function. Ready to discuss)
member.MemberId = mId; | ||
member.MetaStr = meta; | ||
member.Metadata = member.MetaStr; | ||
TBuffer buffer(member.Metadata.value().data() + sizeof(TKafkaVersion), member.Metadata.value().size_bytes() - sizeof(TKafkaVersion)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How this buffer is used here?
if (!ParseCheckStateAndGeneration(ev, groupExists, generation, state, masterId, groupTtl) || | ||
!groupExists || generation != GenerationId || state != GROUP_STATE_SYNC) { // | ||
SendSyncGroupResponseFail(ctx, CorrelationId, | ||
EKafkaErrors::UNKNOWN_SERVER_ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be INVALID_REQUEST is better here cause its error suggests user to check broker logs.
KqpReqCookie++; | ||
NYdb::TParamsBuilder params; | ||
params.AddParam("$ConsumerGroup").Utf8(GroupId).Build(); | ||
params.AddParam("$Database").Utf8(Kqp->DataBase).Build(); | ||
params.AddParam("$Generation").Uint64(GenerationId).Build(); | ||
params.AddParam("$State").Uint64(GROUP_STATE_WORKING).Build(); | ||
|
||
if (SyncGroupRequestData->Assignments.size() == 0) { | ||
SendSyncGroupResponseFail(ctx, CorrelationId, EKafkaErrors::INVALID_REQUEST); | ||
PassAway(); | ||
return; | ||
} | ||
|
||
auto& assignmentList = params.AddParam("$Assignments").BeginList(); | ||
for (auto& assignment: SyncGroupRequestData->Assignments) { | ||
|
||
|
||
assignmentList.AddListItem() | ||
.BeginStruct() | ||
.AddMember("MemberId").Utf8(assignment.MemberId.value()) | ||
.AddMember("Assignment").String(TString(assignment.Assignment.value().data(), | ||
assignment.Assignment.value().size())) | ||
.EndStruct(); | ||
} | ||
assignmentList.EndList().Build(); | ||
Kqp->SendYqlRequest(UPSERT_ASSIGNMENTS_AND_SET_WORKING_STATE, params.Build(), KqpReqCookie, ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest extracting building of KQP request and sending in to KQP to the separate method (like SendUpsertAssignmentsAndSetWorkingStateKqpRequest
) to improve readability.
*/ | ||
|
||
static const TString SUPPORTED_ASSIGN_STRATEGY = "roundrobin"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment not to forget to deprecate this const before commit (or to support feature flag enabled migration)
ReadSessionActorId = ctx.RegisterWithSameMailbox(CreateKafkaReadSessionActor(Context, 0)); | ||
|
||
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TJoinGroupRequestData>& message, const TActorContext& /*ctx*/) { | ||
if (ReadSessionActorId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment not to forget to add feature flag dependency here
No description provided.