Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(DEC): DEC now sends multiple acks with the correct number of msgsacked to facilitate ControlFlow using same node communication bypass. #3345

Open
wants to merge 1 commit into
base: develop-23.02
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 35 additions & 39 deletions dbcon/joblist/distributedenginecomm.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
* Copyright (C) 2016-2022 MariaDB Corporation.
* Copyright (C) 2016-2024 MariaDB Corporation.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -646,7 +646,6 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs, boost::shared_ptr<MQE> mqe,
size_t queueSize)
{
ISMPacketHeader* ism;
uint32_t l_msgCount = msgs.size();

/* If the current queue size > target, do nothing.
Expand Down Expand Up @@ -699,44 +698,42 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,

if (l_msgCount > 0)
{
SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
uint16_t* toAck;
vector<bool> pmAcked(pmCount, false);

ism = (ISMPacketHeader*)msg->getInputPtr();
// The only var checked by ReadThread is the Command var. The others
// are wasted space. We hijack the Size, & Flags fields for the
// params to the ACK msg.
{
SBS ackCommand(new ByteStream(sizeof(ISMPacketHeader)));

ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
toAck = &ism->Size;
ISMPacketHeader* ism = reinterpret_cast<ISMPacketHeader*>(ackCommand->getInputPtr());
// The only var checked by ReadThread is the Command var. The others
// are wasted space. We hijack the Size, & Flags fields for the
// params to the ACK msg.

msg->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here.
bool sendToLocal = false;
while (l_msgCount > 0)
{
/* could have to send up to pmCount ACKs */
uint32_t sockIndex = 0;

/* This will reset the ACK field in the Bytestream directly, and nothing
* else needs to change if multiple msgs are sent. */
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
if (sockIndex == localConnectionId_ && fIsExeMgr)
ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
uint16_t* toAck = &ism->Size;

ackCommand->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here.
while (l_msgCount > 0)
{
sendToLocal = true;
continue;
/* could have to send up to pmCount ACKs */
uint32_t sockIndex = 0;

/* This will reset the ACK field in the Bytestream directly, and nothing
* else needs to change if multiple msgs are sent. */
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
pmAcked[sockIndex] = true;

if (sockIndex == localConnectionId_ && fIsExeMgr)
{
SBS sameNodeAckCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, *toAck);
writeToClient(sockIndex, sameNodeAckCommand);
continue;
}
writeToClient(sockIndex, ackCommand);
}
pmAcked[sockIndex] = true;
writeToClient(sockIndex, msg);
}
if (sendToLocal)
{
pmAcked[localConnectionId_] = true;
writeToClient(localConnectionId_, msg);
}

// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
Expand All @@ -751,8 +748,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,

if (totalUnackedWork == 0)
{
*toAck = 1;

// MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
// It is safe to re-use it because the `ACK` command is sent to all Primitive Servers.
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
for (uint32_t i = 0; i < pmCount; ++i)
{
if (!pmAcked[i])
Expand All @@ -761,14 +759,12 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
{
continue;
}
// MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
writeToClient(i, ackCommand);
}
}
if (!pmAcked[localConnectionId_] && fIsExeMgr)
{
writeToClient(localConnectionId_, msg);
writeToClient(localConnectionId_, ackCommand);
}
}
}
Expand Down