Skip to content

Commit

Permalink
Merge pull request #16872 from taosdata/3.0
Browse files Browse the repository at this point in the history
enh(tsc): handle deadlock
  • Loading branch information
guanshengliang authored Sep 15, 2022
2 parents 024b2ca + ef5c9cf commit 919dd4e
Showing 1 changed file with 30 additions and 6 deletions.
36 changes: 30 additions & 6 deletions source/client/src/clientHb.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
pTscObj->connId = pRsp->query->connId;
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes, pTscObj->pAppInfo->totalDnodes);
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
pTscObj->pAppInfo->totalDnodes);

if (pRsp->query->killRid) {
tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
Expand Down Expand Up @@ -297,7 +298,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {

if (code != 0) {
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes,
(*pInst)->totalDnodes);
}

if (rspNum) {
Expand Down Expand Up @@ -657,6 +659,8 @@ int32_t hbGatherAppInfo(void) {

for (int32_t i = 0; i < sz; ++i) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) continue;

uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) {
Expand Down Expand Up @@ -694,15 +698,21 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo();
}

SArray *mgr = taosArrayInit(sz, sizeof(void *));
for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) {
continue;
}

int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) {
taosArrayPush(mgr, &pAppHbMgr);
continue;
}
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
if (pReq == NULL) {
if (pReq == NULL || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
tFreeClientHbBatchReq(pReq);
continue;
}
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
Expand All @@ -711,6 +721,7 @@ static void *hbThreadFunc(void *param) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosArrayPush(mgr, &pAppHbMgr);
break;
}

Expand All @@ -722,14 +733,15 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf);
taosArrayPush(mgr, &pAppHbMgr);
break;
}
pInfo->fp = hbAsyncCallBack;
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = strdup(pAppHbMgr->key);
pInfo->paramFreeFp = taosMemoryFree;
pInfo->paramFreeFp = taosMemoryFree;
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;

Expand All @@ -741,8 +753,12 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr);

atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosArrayPush(mgr, &pAppHbMgr);
}

taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = mgr;

taosThreadMutexUnlock(&clientHbMgr.lock);

taosMsleep(HEARTBEAT_INTERVAL);
Expand Down Expand Up @@ -834,7 +850,7 @@ void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
if (pItem == *pAppHbMgr) {
hbFreeAppHbMgr(*pAppHbMgr);
*pAppHbMgr = NULL;
taosArrayRemove(clientHbMgr.appHbMgrs, i);
taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
break;
}
}
Expand All @@ -845,6 +861,7 @@ void appHbMgrCleanup(void) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pTarget == NULL) continue;
hbFreeAppHbMgr(pTarget);
}
}
Expand All @@ -859,7 +876,14 @@ int hbMgrInit() {

clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
taosThreadMutexInit(&clientHbMgr.lock, NULL);

TdThreadMutexAttr attr = {0};
taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
int ret = taosThreadMutexAttrInit(&attr);
assert(ret == 0);

taosThreadMutexInit(&clientHbMgr.lock, &attr);
taosThreadMutexAttrDestroy(&attr);

// init handle funcs
hbMgrInitHandle();
Expand Down

0 comments on commit 919dd4e

Please sign in to comment.