Skip to content

Commit

Permalink
Merge pull request #18874 from taosdata/fix/TD-21117
Browse files Browse the repository at this point in the history
fix: control rpc qitem memory
  • Loading branch information
guanshengliang authored Dec 10, 2022
2 parents ac389db + 652f51f commit aa512f1
Show file tree
Hide file tree
Showing 17 changed files with 45 additions and 40 deletions.
3 changes: 2 additions & 1 deletion include/util/tqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ typedef struct STaosQnode {
STaosQnode *next;
STaosQueue *queue;
int64_t timestamp;
int64_t dataSize;
int32_t size;
int8_t itype;
int8_t reserved[3];
Expand Down Expand Up @@ -103,7 +104,7 @@ typedef struct STaosQall {
STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue);
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp);
void *taosAllocateQitem(int32_t size, EQItype itype);
void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize);
void taosFreeQitem(void *pItem);
void taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
Expand Down
12 changes: 6 additions & 6 deletions source/client/src/clientTmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ void tmqAssignAskEpTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__ASK_EP;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
Expand All @@ -703,7 +703,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__COMMIT;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
Expand All @@ -715,7 +715,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq != NULL) {
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
*pTaskType = TMQ_DELAYED_TASK__REPORT;
taosWriteQitem(tmq->delayedTask, pTaskType);
tsem_post(&tmq->rspSem);
Expand Down Expand Up @@ -1171,7 +1171,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
goto CREATE_MSG_FAIL;
}
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL;
Expand Down Expand Up @@ -1204,7 +1204,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
Expand Down Expand Up @@ -1394,7 +1394,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
} else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
}

if (pWorker == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) return -1;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
pRpc->pCont = NULL;

dTrace("msg:%p, is created and will put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen);
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
if (code != 0) {
dTrace("msg:%p, is freed", pMsg);
Expand Down
6 changes: 3 additions & 3 deletions source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}

int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) return -1;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
pRpc->pCont = NULL;

switch (qtype) {
case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into qnode-query queue", pMsg);
dTrace("msg:%p, is created and will put into qnode-query queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0;
case READ_QUEUE:
case FETCH_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg);
dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen);
taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);
return 0;
default:
Expand Down
6 changes: 3 additions & 3 deletions source/dnode/mgmt/mgmt_snode/src/smWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) {
}

int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
Expand All @@ -139,8 +139,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {

SSnode *pSnode = pMgmt->pSnode;
if (pSnode == NULL) {
dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype);
dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,15 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}

int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) {
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
return -1;
}

SMsgHead *pHead = pRpc->pCont;
dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType));
dTrace("vgId:%d, msg:%p is created, type:%s len:%d", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType), pRpc->contLen);

pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/mgmt/node_mgmt/src/dmTransport.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
}

pRpc->info.wrapper = pWrapper;
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen);
if (pMsg == NULL) goto _OVER;

memcpy(pMsg, pRpc, sizeof(SRpcMsg));
dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, pRpc->contLen);

code = dmProcessNodeMsg(pWrapper, pMsg);

Expand Down
2 changes: 1 addition & 1 deletion source/dnode/vnode/src/sma/smaRollup.c
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu
tb_uid_t suid) {
const SSubmitReq *pReq = (const SSubmitReq *)pMsg;

void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM);
void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM, 0);
if (!qItem) {
return TSDB_CODE_FAILED;
}
Expand Down
4 changes: 2 additions & 2 deletions source/dnode/vnode/src/tq/tq.c
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver);

if (!failed) {
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM);
SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK;
pRefBlock->pBlock = pDelBlock;
pRefBlock->dataRef = pRef;
Expand Down Expand Up @@ -1303,7 +1303,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
}

#if 0
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
pStreamBlock->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock));
SSDataBlock block = {0};
Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/src/dataDeleter.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) {

static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM);
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0);
if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
Expand Down
2 changes: 1 addition & 1 deletion source/libs/executor/src/dataDispatcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {

static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0);
if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
Expand Down
6 changes: 3 additions & 3 deletions source/libs/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
}

if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM);
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0);
if (trigger == NULL) return;
trigger->type = STREAM_INPUT__GET_RES;
trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
Expand Down Expand Up @@ -112,7 +112,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
}

int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status;

// enqueue
Expand Down Expand Up @@ -150,7 +150,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR
}

int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status = TASK_INPUT_STATUS__NORMAL;

// enqueue
Expand Down
6 changes: 3 additions & 3 deletions source/libs/stream/src/streamData.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
}

SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0);
if (pDataSubmit == NULL) return NULL;
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
if (pDataSubmit->dataRef == NULL) goto FAIL;
Expand All @@ -81,7 +81,7 @@ SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
}

SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM);
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
if (pMerged == NULL) return NULL;
pMerged->reqs = taosArrayInit(0, sizeof(void*));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
Expand All @@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit)
}

SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) {
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0);
if (pSubmitClone == NULL) {
return NULL;
}
Expand Down
4 changes: 2 additions & 2 deletions source/libs/stream/src/streamExec.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
taosArrayDestroy(pRes);
break;
}
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
terrno = TSDB_CODE_OUT_OF_MEMORY;
Expand Down Expand Up @@ -235,7 +235,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("stream task %d exec end", pTask->taskId);

if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
streamFreeQitem(input);
Expand Down
6 changes: 3 additions & 3 deletions source/libs/sync/test/sync_test_lib/src/syncIO.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
syncRpcMsgLog2(logBuf, pMsg);

SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, pMsg, sizeof(SRpcMsg));

STaosQueue *pMsgQ = gSyncIO->pMsgQ;
Expand Down Expand Up @@ -381,7 +381,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg);
SSyncIO *io = pParent;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
}
Expand Down Expand Up @@ -441,7 +441,7 @@ static void syncIOTickQ(void *param, void *tmrId) {
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg);
taosWriteQitem(io->pMsgQ, pTemp);
Expand Down
2 changes: 1 addition & 1 deletion source/libs/transport/test/svrBench.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void *processShellMsg(void *arg) {
void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg *pTemp;

pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0);
memcpy(pTemp, pMsg, sizeof(SRpcMsg));

int32_t idx = balance % multiQ->numOfThread;
Expand Down
12 changes: 8 additions & 4 deletions source/util/src/tqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,24 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
return memOfItems;
}

void *taosAllocateQitem(int32_t size, EQItype itype) {
void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) {
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}

pNode->dataSize = dataSize;
pNode->size = size;
pNode->itype = itype;
pNode->timestamp = taosGetTimestampUs();

if (itype == RPC_QITEM) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size);
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
if (alloced > tsRpcQueueMemoryAllowed) {
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsRpcQueueMemoryUsed);
atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
taosMemoryFree(pNode);
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
return NULL;
Expand All @@ -139,8 +143,8 @@ void taosFreeQitem(void *pItem) {
if (pItem == NULL) return;

STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
if (pNode->itype > 0) {
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size);
if (pNode->itype == RPC_QITEM) {
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize);
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
} else {
uTrace("item:%p, node:%p is freed", pItem, pNode);
Expand Down

0 comments on commit aa512f1

Please sign in to comment.