Skip to content

Commit

Permalink
fix(DEC): DEC now sends multiple acks with the correct number of msgs…
Browse files Browse the repository at this point in the history
… acked to facilitate ControlFlow using same node communication bypass.
  • Loading branch information
drrtuy committed Nov 8, 2024
1 parent b124e83 commit 6e4be26
Showing 1 changed file with 35 additions and 39 deletions.
74 changes: 35 additions & 39 deletions dbcon/joblist/distributedenginecomm.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* Copyright (C) 2014 InfiniDB, Inc.
* Copyright (C) 2016-2022 MariaDB Corporation.
* Copyright (C) 2016-2024 MariaDB Corporation.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
Expand Down Expand Up @@ -646,7 +646,6 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector<SBS
void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs, boost::shared_ptr<MQE> mqe,
size_t queueSize)
{
ISMPacketHeader* ism;
uint32_t l_msgCount = msgs.size();

/* If the current queue size > target, do nothing.
Expand Down Expand Up @@ -699,44 +698,42 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,

if (l_msgCount > 0)
{
SBS msg(new ByteStream(sizeof(ISMPacketHeader)));
uint16_t* toAck;
vector<bool> pmAcked(pmCount, false);

ism = (ISMPacketHeader*)msg->getInputPtr();
// The only var checked by ReadThread is the Command var. The others
// are wasted space. We hijack the Size, & Flags fields for the
// params to the ACK msg.
{
SBS ackCommand(new ByteStream(sizeof(ISMPacketHeader)));

ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
toAck = &ism->Size;
ISMPacketHeader* ism = reinterpret_cast<ISMPacketHeader*>(ackCommand->getInputPtr());
// The only var checked by ReadThread is the Command var. The others
// are wasted space. We hijack the Size, & Flags fields for the
// params to the ACK msg.

msg->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here.
bool sendToLocal = false;
while (l_msgCount > 0)
{
/* could have to send up to pmCount ACKs */
uint32_t sockIndex = 0;

/* This will reset the ACK field in the Bytestream directly, and nothing
* else needs to change if multiple msgs are sent. */
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
if (sockIndex == localConnectionId_ && fIsExeMgr)
ism->Interleave = uniqueID;
ism->Command = BATCH_PRIMITIVE_ACK;
uint16_t* toAck = &ism->Size;

ackCommand->advanceInputPtr(sizeof(ISMPacketHeader));
// There must be only one local connection here.
while (l_msgCount > 0)
{
sendToLocal = true;
continue;
/* could have to send up to pmCount ACKs */
uint32_t sockIndex = 0;

/* This will reset the ACK field in the Bytestream directly, and nothing
* else needs to change if multiple msgs are sent. */
nextPMToACK(mqe, l_msgCount, &sockIndex, toAck);
idbassert(*toAck <= l_msgCount);
l_msgCount -= *toAck;
pmAcked[sockIndex] = true;

if (sockIndex == localConnectionId_ && fIsExeMgr)
{
SBS sameNodeAckCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, *toAck);
writeToClient(sockIndex, sameNodeAckCommand);
continue;
}
writeToClient(sockIndex, ackCommand);
}
pmAcked[sockIndex] = true;
writeToClient(sockIndex, msg);
}
if (sendToLocal)
{
pmAcked[localConnectionId_] = true;
writeToClient(localConnectionId_, msg);
}

// @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked.
Expand All @@ -751,8 +748,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,

if (totalUnackedWork == 0)
{
*toAck = 1;

// MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
// It is safe to re-use it because the `ACK` command is sent to all Primitive Servers.
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
for (uint32_t i = 0; i < pmCount; ++i)
{
if (!pmAcked[i])
Expand All @@ -761,14 +759,12 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector<SBS>& msgs,
{
continue;
}
// MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server.
SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1);
writeToClient(i, ackCommand);
}
}
if (!pmAcked[localConnectionId_] && fIsExeMgr)
{
writeToClient(localConnectionId_, msg);
writeToClient(localConnectionId_, ackCommand);
}
}
}
Expand Down

0 comments on commit 6e4be26

Please sign in to comment.