From 6e4be2610c700b319eddf9d2fffc697864faf57e Mon Sep 17 00:00:00 2001 From: drrtuy Date: Fri, 8 Nov 2024 17:47:23 +0000 Subject: [PATCH] fix(DEC): DEC now sends multiple acks with the correct number of msgs acked to facilitate ControlFlow using same node communication bypass. --- dbcon/joblist/distributedenginecomm.cpp | 74 ++++++++++++------------- 1 file changed, 35 insertions(+), 39 deletions(-) diff --git a/dbcon/joblist/distributedenginecomm.cpp b/dbcon/joblist/distributedenginecomm.cpp index b9a1579345..8f7127fbb7 100644 --- a/dbcon/joblist/distributedenginecomm.cpp +++ b/dbcon/joblist/distributedenginecomm.cpp @@ -1,5 +1,5 @@ /* Copyright (C) 2014 InfiniDB, Inc. - * Copyright (C) 2016-2022 MariaDB Corporation. + * Copyright (C) 2016-2024 MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -646,7 +646,6 @@ void DistributedEngineComm::read_some(uint32_t key, uint32_t divisor, vector& msgs, boost::shared_ptr mqe, size_t queueSize) { - ISMPacketHeader* ism; uint32_t l_msgCount = msgs.size(); /* If the current queue size > target, do nothing. @@ -699,44 +698,42 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (l_msgCount > 0) { - SBS msg(new ByteStream(sizeof(ISMPacketHeader))); - uint16_t* toAck; vector pmAcked(pmCount, false); - ism = (ISMPacketHeader*)msg->getInputPtr(); - // The only var checked by ReadThread is the Command var. The others - // are wasted space. We hijack the Size, & Flags fields for the - // params to the ACK msg. + { + SBS ackCommand(new ByteStream(sizeof(ISMPacketHeader))); - ism->Interleave = uniqueID; - ism->Command = BATCH_PRIMITIVE_ACK; - toAck = &ism->Size; + ISMPacketHeader* ism = reinterpret_cast(ackCommand->getInputPtr()); + // The only var checked by ReadThread is the Command var. The others + // are wasted space. We hijack the Size, & Flags fields for the + // params to the ACK msg. - msg->advanceInputPtr(sizeof(ISMPacketHeader)); - // There must be only one local connection here. - bool sendToLocal = false; - while (l_msgCount > 0) - { - /* could have to send up to pmCount ACKs */ - uint32_t sockIndex = 0; - - /* This will reset the ACK field in the Bytestream directly, and nothing - * else needs to change if multiple msgs are sent. */ - nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); - idbassert(*toAck <= l_msgCount); - l_msgCount -= *toAck; - if (sockIndex == localConnectionId_ && fIsExeMgr) + ism->Interleave = uniqueID; + ism->Command = BATCH_PRIMITIVE_ACK; + uint16_t* toAck = &ism->Size; + + ackCommand->advanceInputPtr(sizeof(ISMPacketHeader)); + // There must be only one local connection here. + while (l_msgCount > 0) { - sendToLocal = true; - continue; + /* could have to send up to pmCount ACKs */ + uint32_t sockIndex = 0; + + /* This will reset the ACK field in the Bytestream directly, and nothing + * else needs to change if multiple msgs are sent. */ + nextPMToACK(mqe, l_msgCount, &sockIndex, toAck); + idbassert(*toAck <= l_msgCount); + l_msgCount -= *toAck; + pmAcked[sockIndex] = true; + + if (sockIndex == localConnectionId_ && fIsExeMgr) + { + SBS sameNodeAckCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, *toAck); + writeToClient(sockIndex, sameNodeAckCommand); + continue; + } + writeToClient(sockIndex, ackCommand); } - pmAcked[sockIndex] = true; - writeToClient(sockIndex, msg); - } - if (sendToLocal) - { - pmAcked[localConnectionId_] = true; - writeToClient(localConnectionId_, msg); } // @bug4436, when no more unacked work, send an ack to all PMs that haven't been acked. @@ -751,8 +748,9 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, if (totalUnackedWork == 0) { - *toAck = 1; - + // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server. + // It is safe to re-use it because the `ACK` command is sent to all Primitive Servers. + SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1); for (uint32_t i = 0; i < pmCount; ++i) { if (!pmAcked[i]) @@ -761,14 +759,12 @@ void DistributedEngineComm::sendAcks(uint32_t uniqueID, const vector& msgs, { continue; } - // MCOL-5637 Initialize a new bytestream before send a `ACK` command to Primitive Server. - SBS ackCommand = createBatchPrimitiveCommand(BATCH_PRIMITIVE_ACK, uniqueID, 1); writeToClient(i, ackCommand); } } if (!pmAcked[localConnectionId_] && fIsExeMgr) { - writeToClient(localConnectionId_, msg); + writeToClient(localConnectionId_, ackCommand); } } }